from __future__ import annotations
import json
import re
import shlex
import sys
from collections.abc import Iterable
from typing import IO, Any
COMMAND_SEPARATORS = frozenset({"&&", "||", "|", ";", "(", ")", "\n"})
def _strip_backticks(command: str) -> str:
return command.replace("`", " ")
_NEWLINE_MARKER = "\n"
def _tokenize_line(line: str) -> list[str]:
if not line.strip():
return []
lex = shlex.shlex(line, posix=True, punctuation_chars=True)
lex.whitespace_split = True
try:
return list(lex)
except ValueError:
return []
def _heredoc_tag(token: str) -> tuple[str, bool] | None:
if not token:
return None
is_dash = False
tag = token
if tag.startswith("-"):
is_dash = True
tag = tag[1:]
if len(tag) >= 2 and tag[0] == tag[-1] and tag[0] in ("'", '"'):
tag = tag[1:-1]
if not tag:
return None
return tag, is_dash
_HEREDOC_START_PATTERN = re.compile(r"<<(-?)\s*['\"]?([A-Za-z0-9_]\w*)['\"]?")
def _strip_heredocs_from_raw(command: str) -> str:
lines = command.split("\n")
result: list[str] = []
i = 0
while i < len(lines):
line = lines[i]
matches = list(_HEREDOC_START_PATTERN.finditer(line))
if not matches:
result.append(line)
i += 1
continue
result.append("")
i += 1
for match in matches:
is_dash = bool(match.group(1)) tag = match.group(2)
while i < len(lines):
if is_dash:
stripped = lines[i].rstrip().lstrip("\t")
else:
stripped = lines[i].rstrip()
if stripped == tag:
result.append("")
i += 1
break
result.append("")
i += 1
return "\n".join(result)
def _tokenize_raw(command: str) -> list[str]:
cleaned = _strip_backticks(command)
if not cleaned:
return []
cleaned = re.sub(r'\\\n', '', cleaned)
cleaned = _strip_heredocs_from_raw(cleaned)
tokens: list[str] = []
lines = cleaned.split("\n")
for index, line in enumerate(lines):
tokens.extend(_tokenize_line(line))
if index != len(lines) - 1:
tokens.append(_NEWLINE_MARKER)
return tokens
def _drop_heredoc_bodies(tokens: list[str]) -> list[str]:
output: list[str] = []
index = 0
while index < len(tokens):
tok = tokens[index]
if tok == "<<":
tag_info = (
_heredoc_tag(tokens[index + 1]) if index + 1 < len(tokens) else None
)
if tag_info is None:
index += 1
continue
tag, _ = tag_info
index += 2
while index < len(tokens) and tokens[index] != _NEWLINE_MARKER:
index += 1
while index < len(tokens):
if tokens[index] != _NEWLINE_MARKER:
index += 1
continue
index += 1
if index >= len(tokens):
break
if tokens[index] == tag:
next_index = index + 1
if (
next_index >= len(tokens)
or tokens[next_index] == _NEWLINE_MARKER
):
index += 1
break
continue
output.append(tok)
index += 1
return output
def tokenize_command(command: str) -> list[list[str]]:
flat = _tokenize_raw(command)
if not flat:
return []
flat = _drop_heredoc_bodies(flat)
candidates: list[list[str]] = []
current: list[str] = []
for tok in flat:
if tok in COMMAND_SEPARATORS:
if current:
candidates.append(current)
current = []
continue
current.append(tok)
if current:
candidates.append(current)
return candidates
ADVICE_GET_CHANGE_MANIFEST = (
"git diff between refs returns raw text. git-prism alternatives:\n"
" review_change(repo_path, base_ref, head_ref) "
"--> full PR review (manifest + function context)\n"
" get_change_manifest(repo_path, base_ref, head_ref) "
"--> quick file-level scan\n"
"Returns structured per-file change data with function-level semantic "
"analysis."
)
ADVICE_GET_COMMIT_HISTORY = (
"git log between refs returns raw text. git-prism alternative:\n"
" get_commit_history(repo_path, base_ref, head_ref)\n"
"Returns structured commit data with semantic analysis per commit."
)
ADVICE_GET_FUNCTION_CONTEXT = (
"git log -S/-G (pickaxe) returns raw text. git-prism alternative:\n"
" get_function_context(repo_path, base_ref, head_ref)\n"
"Returns callers, definitions, and test references for every changed "
"function -- structured and cross-referenced."
)
ADVICE_GET_FILE_SNAPSHOTS_BLAME = (
"git blame returns raw line-by-line text. git-prism alternative:\n"
" get_file_snapshots(repo_path, base_ref, head_ref, paths=[...], "
"line_range=[start, end], include_before=true, include_after=true)\n"
"Structured before/after content at specific line ranges."
)
ADVICE_GET_FILE_SNAPSHOTS_SHOW = (
"git show returns raw text. git-prism alternative:\n"
" get_file_snapshots(repo_path, base_ref='<sha>^', head_ref='<sha>', "
"paths=[...], include_before=true, include_after=true)\n"
"Returns structured before/after file content at the commit boundary."
)
BLOCK_GH_PR_DIFF = (
"git-prism: gh pr diff returns raw text. Use git-prism instead:\n"
" review_change(repo_path, base_ref, head_ref) "
"--> full PR review (manifest + function context)\n"
" get_change_manifest(repo_path, base_ref, head_ref) "
"--> quick file-level scan\n"
"Structured per-function change data -- same info, no diff noise."
)
BLOCK_MCP_GITHUB_GET_COMMIT = (
"git-prism: mcp__github__get_commit returns raw diff text. Use "
"git-prism instead:\n"
" get_file_snapshots(repo_path, base_ref='<sha>^', head_ref='<sha>', "
"paths=[...], include_before=true, include_after=true)\n"
"Structured before/after content per file -- no raw patch format."
)
BLOCK_MCP_GITHUB_LIST_COMMITS = (
"git-prism: mcp__github__list_commits returns a raw list. Use "
"git-prism instead:\n"
" get_commit_history(repo_path, base_ref, head_ref)\n"
"Structured commits with per-commit semantic change analysis."
)
ADVICE_GET_FILE_SNAPSHOTS_GH_API = (
"gh api repos/.../contents/...?ref=<sha> fetches raw file content from a "
"specific ref via the GitHub API, bypassing git-prism entirely. "
"git-prism alternative:\n"
" get_file_snapshots(repo_path, base_ref='<ref>^', head_ref='<ref>', "
"paths=[...], include_before=true, include_after=true)\n"
"Returns structured before/after file content at the commit boundary -- "
"no raw API response to parse."
)
_GH_API_CONTENTS_PATTERN = re.compile(r"repos/[^/]+/[^/]+/contents/.*[?&]ref=")
def _matches_gh_api_contents(command: str) -> bool:
candidates = tokenize_command(command)
for tokens in candidates:
if len(tokens) >= 2 and tokens[0] == "gh" and tokens[1] == "api":
rest = "".join(tokens[2:])
if _GH_API_CONTENTS_PATTERN.search(rest):
return True
return False
def _has_ref_range(tokens: Iterable[str]) -> bool:
for tok in tokens:
if ".." in tok and tok not in ("..", "..."):
return True
return False
def _has_pickaxe_flag(tokens: Iterable[str]) -> bool:
for tok in tokens:
if tok in ("-S", "-G"):
return True
if tok.startswith("-S") or tok.startswith("-G"):
if len(tok) > 2:
return True
return False
def _classify_git_command(tokens: list[str]) -> str | None:
if len(tokens) < 2 or tokens[0] != "git":
return None
git_subcommand = tokens[1]
rest = tokens[2:]
if git_subcommand == "log" and _has_pickaxe_flag(rest):
return "get_function_context"
if git_subcommand == "diff" and _has_ref_range(rest):
return "get_change_manifest"
if git_subcommand == "log" and _has_ref_range(rest):
return "get_commit_history"
if git_subcommand == "blame":
return "get_file_snapshots"
if git_subcommand == "show":
return "get_file_snapshots"
return None
def _advice_for_tool(tool_name: str, git_subcommand: str | None = None) -> str:
if tool_name == "get_change_manifest":
return ADVICE_GET_CHANGE_MANIFEST
if tool_name == "get_commit_history":
return ADVICE_GET_COMMIT_HISTORY
if tool_name == "get_function_context":
return ADVICE_GET_FUNCTION_CONTEXT
if tool_name == "get_file_snapshots":
if git_subcommand == "blame":
return ADVICE_GET_FILE_SNAPSHOTS_BLAME
return ADVICE_GET_FILE_SNAPSHOTS_SHOW
raise ValueError(f"Unknown redirect tool: {tool_name!r}")
class Decision:
__slots__ = ("mode", "advice", "message", "tool_name")
mode: str
advice: str
message: str
tool_name: str
def __init__(
self,
mode: str,
advice: str = "",
message: str = "",
tool_name: str = "",
):
self.mode = mode
self.advice = advice
self.message = message
self.tool_name = tool_name
SILENT = Decision("silent")
def decide_redirect(hook_event_payload: dict[str, Any]) -> Decision:
tool_name = hook_event_payload.get("tool_name", "")
if tool_name == "mcp__github__get_commit":
return Decision(
"block",
message=BLOCK_MCP_GITHUB_GET_COMMIT,
tool_name=tool_name,
)
if tool_name == "mcp__github__list_commits":
return Decision(
"block",
message=BLOCK_MCP_GITHUB_LIST_COMMITS,
tool_name=tool_name,
)
if tool_name != "Bash":
return SILENT
command = hook_event_payload.get("tool_input", {}).get("command", "")
if not command:
return SILENT
return _decide_redirect_for_bash_command(command)
def _decide_redirect_for_bash_command(command: str) -> Decision:
if _matches_gh_pr_diff(command):
return Decision("block", message=BLOCK_GH_PR_DIFF, tool_name="Bash")
if _matches_gh_api_contents(command):
return Decision(
"advise",
advice=_advice_with_echo(
ADVICE_GET_FILE_SNAPSHOTS_GH_API, ["gh", "api", "contents"]
),
tool_name="Bash",
)
candidates = tokenize_command(command)
if not candidates:
return SILENT
for tokens in candidates:
if not tokens:
continue
if tokens[0] == "mcp__github__get_commit":
return Decision(
"block",
message=BLOCK_MCP_GITHUB_GET_COMMIT,
tool_name=tokens[0],
)
if tokens[0] == "mcp__github__list_commits":
return Decision(
"block",
message=BLOCK_MCP_GITHUB_LIST_COMMITS,
tool_name=tokens[0],
)
target = _classify_git_command(tokens)
if target is None:
continue
git_subcommand = tokens[1] if len(tokens) > 1 else None
return Decision(
"advise",
advice=_advice_with_echo(_advice_for_tool(target, git_subcommand), tokens),
tool_name="Bash",
)
return SILENT
def _advice_with_echo(base_advice: str, tokens: list[str]) -> str:
echoed = " ".join(tokens)
return f"{base_advice}\n\nYou ran: {echoed}"
def _matches_gh_pr_diff(command: str) -> bool:
candidates = tokenize_command(command)
for tokens in candidates:
if (
len(tokens) >= 3
and tokens[0] == "gh"
and tokens[1] == "pr"
and tokens[2] == "diff"
):
return True
return False
def _is_functionally_empty(raw_stdin_content: str) -> bool:
if not raw_stdin_content:
return True
decoded_content = (
raw_stdin_content.replace("\\n", "\n").replace("\\t", "\t").replace("\\r", "\r")
)
return not decoded_content.strip()
def _read_payload(stdin: IO[str]) -> dict[str, Any] | None:
raw_stdin_content = stdin.read()
if _is_functionally_empty(raw_stdin_content):
return None
try:
parsed: dict[str, Any] = json.loads(raw_stdin_content)
return parsed
except json.JSONDecodeError:
sys.stderr.write(
"git-prism-redirect: malformed JSON on stdin -- skipping redirect\n"
)
return None
def _emit_advice(advice: str) -> None:
payload = {
"hookSpecificOutput": {
"hookEventName": "PreToolUse",
"permissionDecision": "allow",
"additionalContext": advice,
}
}
sys.stdout.write(json.dumps(payload))
sys.stdout.write("\n")
def main() -> int:
try:
payload = _read_payload(sys.stdin)
except Exception: sys.stderr.write(
"git-prism-redirect: unexpected stdin error -- skipping redirect\n"
)
return 0
if payload is None:
return 0
if not isinstance(payload, dict):
return 0
try:
decision = decide_redirect(payload)
except Exception: sys.stderr.write(
"git-prism-redirect: unexpected error classifying command; skipping redirect\n"
)
return 0
try:
if decision.mode == "advise":
_emit_advice(decision.advice)
return 0
if decision.mode == "block":
sys.stderr.write(decision.message)
sys.stderr.write("\n")
return 2
except (
Exception
): return 0
return 0
if __name__ == "__main__":
sys.exit(main())