import argparse
import csv
import json as json_lib
import os
import re
import ssl
import sys
from datetime import datetime
from typing import Dict, List, Optional
from urllib.request import Request, urlopen
from urllib.error import HTTPError, URLError
try:
import certifi
SSL_CONTEXT = ssl.create_default_context(cafile=certifi.where())
except ImportError:
SSL_CONTEXT = ssl.create_default_context()
class GitHubPRComments:
def __init__(self, owner: str, repo: str, token: Optional[str] = None):
self.owner = owner
self.repo = repo
self.token = token or os.getenv("GITHUB_TOKEN")
self.base_url = "https://api.github.com"
self.headers = {
"Accept": "application/vnd.github.v3+json",
"User-Agent": "PR-Comments-Fetcher",
}
if self.token:
self.headers["Authorization"] = f"token {self.token}"
def _make_request(
self, endpoint: str, method: str = "GET", data: Optional[Dict] = None
):
if method == "POST":
url = f"{self.base_url}{endpoint}"
req_data = json_lib.dumps(data).encode("utf-8") if data else None
req = Request(url, data=req_data, headers=self.headers, method="POST")
try:
with urlopen(req, context=SSL_CONTEXT) as response:
status_code = response.getcode()
headers = dict(response.headers)
if status_code == 401:
print(
"Error: Authentication failed (HTTP 401).",
file=sys.stderr,
)
if not self.token:
print(
"No GitHub token provided. Set GITHUB_TOKEN environment variable or use --token flag.",
file=sys.stderr,
)
print(
"Create a token at: https://github.com/settings/tokens",
file=sys.stderr,
)
else:
print(
"GitHub token is invalid or expired. Check your token or create a new one at: https://github.com/settings/tokens",
file=sys.stderr,
)
sys.exit(1)
elif status_code == 404:
print(
"Error: Not found. Check owner/repo/PR number/comment ID.",
file=sys.stderr,
)
sys.exit(1)
elif status_code == 403:
print(
"Error: Rate limit exceeded or access denied.",
file=sys.stderr,
)
if "X-RateLimit-Remaining" in headers:
print(
f"Rate limit remaining: {headers['X-RateLimit-Remaining']}",
file=sys.stderr,
)
sys.exit(1)
elif status_code not in (200, 201):
print(f"Error: HTTP {status_code}", file=sys.stderr)
error_body = response.read().decode("utf-8")
print(f"Response: {error_body}", file=sys.stderr)
sys.exit(1)
response_data = response.read().decode("utf-8")
return json_lib.loads(response_data)
except HTTPError as e:
print(f"Error: HTTP {e.code}: {e.reason}", file=sys.stderr)
error_body = e.read().decode("utf-8") if hasattr(e, "read") else ""
print(f"Response: {error_body}", file=sys.stderr)
sys.exit(1)
except URLError as e:
print(
f"Error: Failed to connect to GitHub API: {e.reason}",
file=sys.stderr,
)
sys.exit(1)
all_items = []
page = 1
per_page = 100
while True:
url = f"{self.base_url}{endpoint}?page={page}&per_page={per_page}"
req = Request(url, headers=self.headers)
try:
with urlopen(req, context=SSL_CONTEXT) as response:
status_code = response.getcode()
headers = dict(response.headers)
if status_code == 401:
print(
"Error: Authentication failed (HTTP 401).",
file=sys.stderr,
)
if not self.token:
print(
"No GitHub token provided. Set GITHUB_TOKEN environment variable or use --token flag.",
file=sys.stderr,
)
print(
"Create a token at: https://github.com/settings/tokens",
file=sys.stderr,
)
else:
print(
"GitHub token is invalid or expired. Check your token or create a new one at: https://github.com/settings/tokens",
file=sys.stderr,
)
sys.exit(1)
elif status_code == 404:
print(
"Error: Not found. Check owner/repo/PR number.",
file=sys.stderr,
)
sys.exit(1)
elif status_code == 403:
print(
"Error: Rate limit exceeded or access denied.",
file=sys.stderr,
)
if "X-RateLimit-Remaining" in headers:
print(
f"Rate limit remaining: {headers['X-RateLimit-Remaining']}",
file=sys.stderr,
)
sys.exit(1)
elif status_code != 200:
print(f"Error: HTTP {status_code}", file=sys.stderr)
sys.exit(1)
data = response.read().decode("utf-8")
items = json_lib.loads(data)
if not items:
break
all_items.extend(items)
if len(items) < per_page:
break
page += 1
except HTTPError as e:
print(f"Error: HTTP {e.code} - {e.reason}", file=sys.stderr)
sys.exit(1)
except URLError as e:
print(f"Error: {e.reason}", file=sys.stderr)
sys.exit(1)
return all_items
def get_pr_info(self, pr_number: int) -> Dict:
endpoint = f"/repos/{self.owner}/{self.repo}/pulls/{pr_number}"
url = f"{self.base_url}{endpoint}"
req = Request(url, headers=self.headers)
try:
with urlopen(req, context=SSL_CONTEXT) as response:
status_code = response.getcode()
if status_code == 401:
print(
"Error: Authentication failed (HTTP 401).",
file=sys.stderr,
)
if not self.token:
print(
"No GitHub token provided. Set GITHUB_TOKEN environment variable or use --token flag.",
file=sys.stderr,
)
print(
"Create a token at: https://github.com/settings/tokens",
file=sys.stderr,
)
else:
print(
"GitHub token is invalid or expired. Check your token or create a new one at: https://github.com/settings/tokens",
file=sys.stderr,
)
sys.exit(1)
elif status_code != 200:
print(f"Error: HTTP {status_code}", file=sys.stderr)
sys.exit(1)
data = response.read().decode("utf-8")
return json_lib.loads(data)
except HTTPError as e:
if e.code == 401:
print(
"Error: Authentication failed (HTTP 401).",
file=sys.stderr,
)
if not self.token:
print(
"No GitHub token provided. Set GITHUB_TOKEN environment variable or use --token flag.",
file=sys.stderr,
)
print(
"Create a token at: https://github.com/settings/tokens",
file=sys.stderr,
)
else:
print(
"GitHub token is invalid or expired. Check your token or create a new one at: https://github.com/settings/tokens",
file=sys.stderr,
)
else:
print(f"Error: HTTP {e.code} - {e.reason}", file=sys.stderr)
sys.exit(1)
except URLError as e:
print(f"Error: {e.reason}", file=sys.stderr)
sys.exit(1)
def get_issue_comments(self, pr_number: int) -> List[Dict]:
endpoint = f"/repos/{self.owner}/{self.repo}/issues/{pr_number}/comments"
return self._make_request(endpoint)
def get_review_comments(self, pr_number: int) -> List[Dict]:
endpoint = f"/repos/{self.owner}/{self.repo}/pulls/{pr_number}/comments"
comments = self._make_request(endpoint)
return comments
def get_conversations(self, pr_number: int) -> List[Dict]:
return []
def get_reviews(self, pr_number: int) -> List[Dict]:
endpoint = f"/repos/{self.owner}/{self.repo}/pulls/{pr_number}/reviews"
return self._make_request(endpoint)
def get_comment_by_id(self, comment_id: int, comment_type: str) -> Optional[Dict]:
try:
if comment_type == "issue":
endpoint = (
f"/repos/{self.owner}/{self.repo}/issues/comments/{comment_id}"
)
elif comment_type == "review_comment":
endpoint = (
f"/repos/{self.owner}/{self.repo}/pulls/comments/{comment_id}"
)
elif comment_type == "review":
print(
"Warning: Getting review by ID requires PR number. Use fetch_all_comments() instead.",
file=sys.stderr,
)
return None
else:
print(
f"Error: Unknown comment type: {comment_type}",
file=sys.stderr,
)
return None
url = f"{self.base_url}{endpoint}"
req = Request(url, headers=self.headers)
with urlopen(req, context=SSL_CONTEXT) as response:
if response.getcode() == 200:
data = response.read().decode("utf-8")
return json_lib.loads(data)
elif response.getcode() == 404:
return None
else:
print(
f"Error: HTTP {response.getcode()}",
file=sys.stderr,
)
return None
except HTTPError as e:
if e.code == 404:
return None
print(f"Error: HTTP {e.code} - {e.reason}", file=sys.stderr)
return None
except URLError as e:
print(f"Error: {e.reason}", file=sys.stderr)
return None
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
return None
def _get_resolved_comment_ids(self, pr_number: int) -> set:
if not self.token:
print(
"Warning: No GitHub token provided. Cannot check resolved status. Showing all comments.",
file=sys.stderr,
)
return set()
try:
graphql_url = "https://api.github.com/graphql"
query = """
query($owner: String!, $repo: String!, $prNumber: Int!) {
repository(owner: $owner, name: $repo) {
pullRequest(number: $prNumber) {
reviewThreads(first: 100) {
nodes {
isResolved
comments(first: 100) {
nodes {
id
databaseId
}
}
}
}
}
}
}
"""
variables = {"owner": self.owner, "repo": self.repo, "prNumber": pr_number}
payload = json_lib.dumps({"query": query, "variables": variables}).encode(
"utf-8"
)
graphql_headers = {
"Accept": "application/json",
"Content-Type": "application/json",
"User-Agent": "PR-Comments-Fetcher",
"Authorization": f"Bearer {self.token}",
}
req = Request(
graphql_url, data=payload, headers=graphql_headers, method="POST"
)
with urlopen(req, context=SSL_CONTEXT) as response:
if response.getcode() != 200:
error_body = response.read().decode("utf-8")
print(
f"GraphQL API returned status {response.getcode()}: {error_body}",
file=sys.stderr,
)
return set()
data = json_lib.loads(response.read().decode("utf-8"))
if "errors" in data:
print(
f"GraphQL API errors: {json_lib.dumps(data['errors'], indent=2)}",
file=sys.stderr,
)
return set()
resolved_ids = set()
threads = (
data.get("data", {})
.get("repository", {})
.get("pullRequest", {})
.get("reviewThreads", {})
.get("nodes", [])
)
total_threads = len(threads)
resolved_threads = 0
for thread in threads:
is_resolved = thread.get("isResolved", False)
if is_resolved:
resolved_threads += 1
comments = thread.get("comments", {}).get("nodes", [])
for comment in comments:
comment_id = comment.get("databaseId")
if comment_id:
resolved_ids.add(comment_id)
if total_threads > 0:
print(
f"Found {total_threads} review threads, {resolved_threads} resolved, {len(resolved_ids)} resolved comments",
file=sys.stderr,
)
else:
print(
"Warning: No review threads found via GraphQL. Showing all comments.",
file=sys.stderr,
)
return resolved_ids
except Exception as e:
print(
f"Warning: Could not fetch resolved status via GraphQL: {e}",
file=sys.stderr,
)
return set()
def fetch_all_comments(self, pr_number: int, status: str = "open") -> Dict:
print(f"Fetching comments for PR #{pr_number}...", file=sys.stderr)
pr_info = self.get_pr_info(pr_number)
issue_comments = self.get_issue_comments(pr_number)
review_comments = self.get_review_comments(pr_number)
reviews = self.get_reviews(pr_number)
if status != "all":
resolved_comment_ids = self._get_resolved_comment_ids(pr_number)
filtered_review_comments = []
for comment in review_comments:
comment_id = comment.get("id")
is_resolved = (
comment_id in resolved_comment_ids
if resolved_comment_ids
else False
)
if status == "open" and not is_resolved:
filtered_review_comments.append(comment)
elif status == "resolved" and is_resolved:
filtered_review_comments.append(comment)
review_comments = filtered_review_comments
if status == "open":
print(
"Filtering to show only open (unresolved) comments...",
file=sys.stderr,
)
elif status == "resolved":
print("Filtering to show only resolved comments...", file=sys.stderr)
return {
"pr_info": pr_info,
"issue_comments": issue_comments,
"review_comments": review_comments,
"reviews": reviews,
}
def reply_to_comment(
self, pr_number: int, comment_id: int, comment_type: str, reply_body: str
) -> Dict:
if not self.token:
print(
"Error: GitHub token is required for posting replies", file=sys.stderr
)
sys.exit(1)
if comment_type == "review_comment":
endpoint = f"/repos/{self.owner}/{self.repo}/pulls/{pr_number}/comments/{comment_id}/replies"
data = {"body": reply_body}
result = self._make_request(endpoint, method="POST", data=data)
return result
elif comment_type == "issue":
endpoint = f"/repos/{self.owner}/{self.repo}/issues/{pr_number}/comments"
data = {"body": reply_body, "in_reply_to": comment_id}
result = self._make_request(endpoint, method="POST", data=data)
return result
elif comment_type == "review":
print(
"Warning: Reviews cannot be directly replied to. Posting as issue comment instead.",
file=sys.stderr,
)
endpoint = f"/repos/{self.owner}/{self.repo}/issues/{pr_number}/comments"
data = {"body": reply_body}
result = self._make_request(endpoint, method="POST", data=data)
return result
else:
print(f"Error: Unknown comment type: {comment_type}", file=sys.stderr)
sys.exit(1)
@staticmethod
def format_comment(comment: Dict, comment_type: str = "comment") -> str:
user = comment.get("user", {})
user_login = (
user.get("login", "Unknown") if isinstance(user, dict) else str(user)
)
created_at = comment.get("created_at", "")
body = comment.get("body", "")
if body is None:
body = ""
if created_at:
try:
dt = datetime.fromisoformat(created_at.replace("Z", "+00:00"))
date_str = dt.strftime("%Y-%m-%d %H:%M:%S UTC")
except ValueError:
date_str = created_at
else:
date_str = "Unknown date"
if comment_type == "review_comment":
path = comment.get("path", "Unknown file")
line = comment.get("line", "?")
original_line = comment.get("original_line")
start_line = comment.get("start_line")
diff_hunk = comment.get("diff_hunk", "")
in_reply_to_id = comment.get("in_reply_to_id")
pull_request_review_id = comment.get("pull_request_review_id")
result = f"[Review Comment] {user_login} on {date_str}\n"
result += f"File: {path}"
if line:
result += f" (line {line})"
if original_line and original_line != line:
result += f" (original line {original_line})"
if start_line:
result += f" (start line {start_line})"
result += "\n"
if in_reply_to_id:
result += f"Reply to comment ID: {in_reply_to_id}\n"
if pull_request_review_id:
result += f"Review ID: {pull_request_review_id}\n"
if diff_hunk:
result += f"\nCode Context:\n{diff_hunk}\n"
if body:
result += f"\nComment:\n{body}\n"
else:
result += "\n(No comment text)\n"
elif comment_type == "review":
state = comment.get("state", "unknown")
result = f"[Review] {user_login} ({state}) on {date_str}\n"
if body:
result += f"\nReview Body:\n{body}\n"
else:
result += "\n(No review body)\n"
else:
result = f"[Comment] {user_login} on {date_str}\n"
if body:
result += f"\n{body}\n"
else:
result += "\n(No comment text)\n"
return result + "\n" + "-" * 80 + "\n"
def print_csv_view(self, data: Dict, output_file: Optional[str] = None):
pr_info = data["pr_info"]
issue_comments = data["issue_comments"]
review_comments = data["review_comments"]
reviews = data["reviews"]
if output_file:
output = open(output_file, "w", newline="", encoding="utf-8")
else:
output = sys.stdout
try:
writer = csv.writer(output)
writer.writerow(
["PR Number", "PR Title", "Author", "State", "Created At", "URL"]
)
writer.writerow(
[
pr_info["number"],
pr_info["title"],
(
pr_info["user"]["login"]
if isinstance(pr_info["user"], dict)
else "Unknown"
),
pr_info["state"],
pr_info["created_at"],
pr_info["html_url"],
]
)
writer.writerow([])
if issue_comments:
writer.writerow(
["Comment Type", "ID", "Author", "Created At", "Body", "URL"]
)
for comment in issue_comments:
writer.writerow(
[
"issue_comment",
comment.get("id", ""),
(
comment.get("user", {}).get("login", "Unknown")
if isinstance(comment.get("user"), dict)
else "Unknown"
),
comment.get("created_at", ""),
comment.get("body", "")
.replace("\n", " ")
.replace("\r", ""),
comment.get("html_url", ""),
]
)
writer.writerow([])
if review_comments:
writer.writerow(
[
"Comment Type",
"ID",
"Author",
"Created At",
"File",
"Line",
"Body",
"URL",
]
)
for comment in review_comments:
writer.writerow(
[
"review_comment",
comment.get("id", ""),
(
comment.get("user", {}).get("login", "Unknown")
if isinstance(comment.get("user"), dict)
else "Unknown"
),
comment.get("created_at", ""),
comment.get("path", ""),
comment.get("line", ""),
comment.get("body", "")
.replace("\n", " ")
.replace("\r", ""),
comment.get("html_url", ""),
]
)
writer.writerow([])
if reviews:
writer.writerow(
[
"Comment Type",
"ID",
"Author",
"State",
"Created At",
"Body",
"URL",
]
)
for review in reviews:
writer.writerow(
[
"review",
review.get("id", ""),
(
review.get("user", {}).get("login", "Unknown")
if isinstance(review.get("user"), dict)
else "Unknown"
),
review.get("state", ""),
review.get("submitted_at") or review.get("created_at", ""),
review.get("body", "").replace("\n", " ").replace("\r", ""),
review.get("html_url", ""),
]
)
finally:
if output_file and output:
output.close()
print(f"CSV exported to {output_file}", file=sys.stderr)
def print_table_view(self, data: Dict):
pr_info = data["pr_info"]
issue_comments = data["issue_comments"]
review_comments = data["review_comments"]
print(f"\n{'=' * 100}")
print(f"PR #{pr_info['number']}: {pr_info['title']}")
print(
f"Author: {pr_info['user']['login']} | State: {pr_info['state']} | Created: {pr_info['created_at']}"
)
print(f"URL: {pr_info['html_url']}")
print(f"{'=' * 100}\n")
def truncate(text: str, max_len: int = 60) -> str:
if not text:
return ""
text = text.replace("\n", " ").replace("\r", "")
if len(text) > max_len:
return text[: max_len - 3] + "..."
return text
if issue_comments:
print(f"\n{'─' * 100}")
print(f"ISSUE COMMENTS ({len(issue_comments)} total)")
print(f"{'─' * 100}")
print(f"{'ID':<12} {'Author':<20} {'Date':<20} {'Comment':<48}")
print(f"{'─' * 100}")
for comment in issue_comments:
comment_id = str(comment.get("id", "N/A"))
user = (
comment.get("user", {}).get("login", "Unknown")
if isinstance(comment.get("user"), dict)
else "Unknown"
)
created = (
comment.get("created_at", "")[:19]
if comment.get("created_at")
else "Unknown"
)
body = truncate(comment.get("body", ""), 48)
print(f"{comment_id:<12} {user:<20} {created:<20} {body:<48}")
print(f"{'─' * 100}\n")
if review_comments:
print(f"\n{'─' * 100}")
print(f"REVIEW COMMENTS - Code Comments ({len(review_comments)} total)")
print(f"{'─' * 100}")
print(f"{'ID':<12} {'Author':<20} {'File':<35} {'Line':<8} {'Comment':<25}")
print(f"{'─' * 100}")
for comment in review_comments:
comment_id = str(comment.get("id", "N/A"))
user = (
comment.get("user", {}).get("login", "Unknown")
if isinstance(comment.get("user"), dict)
else "Unknown"
)
path = comment.get("path", "Unknown")
if len(path) > 33:
path = "..." + path[-30:]
line = str(comment.get("line", "?"))
body = truncate(comment.get("body", ""), 25)
print(f"{comment_id:<12} {user:<20} {path:<35} {line:<8} {body:<25}")
print(f"{'─' * 100}\n")
total = len(issue_comments) + len(review_comments)
print(f"{'=' * 100}")
print(
f"SUMMARY: {len(issue_comments)} Issue Comments | {len(review_comments)} Review Comments | Total: {total}"
)
print(f"{'=' * 100}\n")
def print_all_comments(self, data: Dict):
pr_info = data["pr_info"]
print(f"\n{'=' * 80}")
print(f"PR #{pr_info['number']}: {pr_info['title']}")
print(f"Author: {pr_info['user']['login']}")
print(f"State: {pr_info['state']}")
print(f"Created: {pr_info['created_at']}")
print(f"URL: {pr_info['html_url']}")
print(f"{'=' * 80}\n")
issue_comments = data["issue_comments"]
if issue_comments:
print(f"\n{'=' * 80}")
print(f"ISSUE COMMENTS ({len(issue_comments)} total)")
print(f"{'=' * 80}\n")
for comment in issue_comments:
print(self.format_comment(comment, "issue"))
review_comments = data["review_comments"]
if review_comments:
print(f"\n{'=' * 80}")
print(f"REVIEW COMMENTS (Code Comments) ({len(review_comments)} total)")
print(f"{'=' * 80}\n")
for comment in review_comments:
print(self.format_comment(comment, "review_comment"))
reviews = data["reviews"]
if reviews:
print(f"\n{'=' * 80}")
print(f"REVIEWS ({len(reviews)} total)")
print(f"{'=' * 80}\n")
for review in reviews:
print(self.format_comment(review, "review"))
total = len(issue_comments) + len(review_comments) + len(reviews)
print(f"\n{'=' * 80}")
print("SUMMARY")
print(f"{'=' * 80}")
print(f"Issue Comments: {len(issue_comments)}")
print(f"Review Comments: {len(review_comments)}")
print(f"Reviews: {len(reviews)}")
print(f"Total: {total}")
print(f"{'=' * 80}\n")
def parse_pr_url(url: str) -> tuple:
pattern = r"github\.com/([^/]+)/([^/]+)/pull/(\d+)"
match = re.search(pattern, url)
if match:
return match.groups()[0], match.groups()[1], int(match.groups()[2])
raise ValueError(f"Invalid PR URL format: {url}")
def main():
parser = argparse.ArgumentParser(
description="Fetch all comments from a GitHub Pull Request",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
parser.add_argument(
"owner_or_url",
nargs="?",
help="Repository owner (username/org) or full PR URL",
)
parser.add_argument(
"repo",
nargs="?",
help="Repository name (required if owner is provided, not if URL is provided)",
)
parser.add_argument(
"pr_number",
nargs="?",
type=int,
help="Pull request number (required if owner/repo provided, not if URL is provided)",
)
parser.add_argument(
"--token",
help="GitHub personal access token (or set GITHUB_TOKEN env var)",
)
parser.add_argument(
"--json",
action="store_true",
help="Output as JSON (useful for AI/automation)",
)
parser.add_argument(
"--table",
action="store_true",
help="Output as formatted table (default: detailed text format)",
)
parser.add_argument(
"--csv",
nargs="?",
const="",
metavar="FILE",
help="Output as CSV (optionally specify output file, default: stdout)",
)
parser.add_argument(
"--detail",
type=int,
metavar="COMMENT_ID",
help="Get details for a specific comment by ID",
)
parser.add_argument(
"--comment-type",
choices=["issue", "review_comment", "review"],
help="Comment type for --detail or --reply (required with --detail or --reply)",
)
parser.add_argument(
"--reply",
type=int,
metavar="COMMENT_ID",
help="Reply to a specific comment by ID",
)
parser.add_argument(
"--reply-body",
help="Reply body text (required with --reply)",
)
parser.add_argument(
"--reply-body-file",
help="File containing reply body text (alternative to --reply-body)",
)
parser.add_argument(
"--status",
choices=["open", "resolved", "all"],
default="open",
help="Filter comments by status: 'open' (default, unresolved comments), 'resolved', or 'all' (all comments). Note: Requires --token for status filtering to work.",
)
args = parser.parse_args()
if not args.owner_or_url:
parser.print_help()
sys.exit(1)
if args.owner_or_url.startswith("http"):
try:
owner, repo, pr_number = parse_pr_url(args.owner_or_url)
except ValueError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
else:
if not args.repo or not args.pr_number:
print(
"Error: Must provide both repo and pr_number when using owner format",
file=sys.stderr,
)
parser.print_help()
sys.exit(1)
owner = args.owner_or_url
repo = args.repo
pr_number = args.pr_number
client = GitHubPRComments(owner, repo, token=args.token)
if args.reply:
if not args.comment_type:
print("Error: --comment-type is required with --reply", file=sys.stderr)
sys.exit(1)
if not args.reply_body and not args.reply_body_file:
print(
"Error: --reply-body or --reply-body-file is required with --reply",
file=sys.stderr,
)
sys.exit(1)
if not args.token:
print("Error: --token is required for --reply", file=sys.stderr)
sys.exit(1)
reply_body = args.reply_body
if args.reply_body_file:
with open(args.reply_body_file, "r", encoding="utf-8") as f:
reply_body = f.read()
result = client.reply_to_comment(
pr_number, args.reply, args.comment_type, reply_body
)
print("Reply posted successfully!")
print(f"Comment ID: {result.get('id')}")
print(f"URL: {result.get('html_url')}")
sys.exit(0)
if args.detail:
if not args.comment_type:
print("Error: --comment-type is required with --detail", file=sys.stderr)
sys.exit(1)
comment = client.get_comment_by_id(args.detail, args.comment_type)
if not comment:
print(f"Error: Comment {args.detail} not found", file=sys.stderr)
sys.exit(1)
if args.json:
print(json_lib.dumps(comment, indent=2))
else:
comment_type_map = {
"issue": "issue",
"review_comment": "review_comment",
"review": "review",
}
print(
client.format_comment(
comment, comment_type_map.get(args.comment_type, "issue")
)
)
sys.exit(0)
data = client.fetch_all_comments(pr_number, status=args.status)
if args.json:
json_output = {
"pr": {
"number": data["pr_info"]["number"],
"title": data["pr_info"]["title"],
"author": (
data["pr_info"]["user"]["login"]
if isinstance(data["pr_info"]["user"], dict)
else "Unknown"
),
"state": data["pr_info"]["state"],
"created_at": data["pr_info"]["created_at"],
"url": data["pr_info"]["html_url"],
},
"comments": {
"issue_comments": [
{
"id": c.get("id"),
"user": (
c.get("user", {}).get("login", "Unknown")
if isinstance(c.get("user"), dict)
else "Unknown"
),
"created_at": c.get("created_at"),
"body": c.get("body", ""),
"url": c.get("html_url"),
}
for c in data["issue_comments"]
],
"review_comments": [
{
"id": c.get("id"),
"user": (
c.get("user", {}).get("login", "Unknown")
if isinstance(c.get("user"), dict)
else "Unknown"
),
"created_at": c.get("created_at"),
"path": c.get("path"),
"line": c.get("line"),
"body": c.get("body", ""),
"diff_hunk": c.get("diff_hunk"),
"url": c.get("html_url"),
}
for c in data["review_comments"]
],
"reviews": [
{
"id": r.get("id"),
"user": (
r.get("user", {}).get("login", "Unknown")
if isinstance(r.get("user"), dict)
else "Unknown"
),
"state": r.get("state"),
"created_at": r.get("created_at"),
"body": r.get("body", ""),
"url": r.get("html_url"),
}
for r in data["reviews"]
],
},
"summary": {
"total_issue_comments": len(data["issue_comments"]),
"total_review_comments": len(data["review_comments"]),
"total_reviews": len(data["reviews"]),
"total_comments": len(data["issue_comments"])
+ len(data["review_comments"])
+ len(data["reviews"]),
},
}
print(json_lib.dumps(json_output, indent=2))
elif args.csv is not None:
output_file = args.csv if args.csv else None
client.print_csv_view(data, output_file)
elif args.table:
client.print_table_view(data)
else:
client.print_all_comments(data)
if __name__ == "__main__":
main()