import argparse
import json
import os
import re
import shlex
import sys
_SUBSHELL_RE = re.compile(r'\$\(([^)]*)\)|<\(([^)]*)\)|`([^`]*)`')
_HEREDOC_MARKER_RE = re.compile(r'<<-?\s*\\?[\'"]?(\w+)[\'"]?')
_SEQ_SPLIT_RE = re.compile(r'\s*(?:&&|\|\||;)\s*')
_PIPE_SPLIT_RE = re.compile(r'\s*(?<!\|)\|(?!\|)\s*')
def _mask_quotes(s):
out = list(s)
i = 0
n = len(s)
while i < n:
if s[i] == "'":
j = s.find("'", i + 1)
if j == -1:
j = n - 1
for k in range(i, j + 1):
out[k] = ' '
i = j + 1
elif s[i] == '"':
j = i + 1
while j < n and s[j] != '"':
if s[j] == '\\' and j + 1 < n:
j += 1
j += 1
for k in range(i, min(j + 1, n)):
out[k] = ' '
i = j + 1
else:
i += 1
return ''.join(out)
def _quote_aware_split(cmd, sep_re):
masked = _mask_quotes(cmd)
parts = []
last = 0
for m in sep_re.finditer(masked):
parts.append(cmd[last:m.start()])
last = m.end()
parts.append(cmd[last:])
return parts
def _strip_heredoc_bodies(cmd_string):
lines = cmd_string.split('\n')
result = []
skip_until = None
for line in lines:
if skip_until is not None:
if line.strip() == skip_until:
skip_until = None
result.append(line)
continue
result.append(line)
m = _HEREDOC_MARKER_RE.search(line)
if m:
skip_until = m.group(1)
return '\n'.join(result)
ALLOWED = {"make", "git", "gh", "catenary", "kubectl", "cp", "rm", "rmdir", "touch", "mkdir", "mv", "chmod", "sleep", "cd", "true", "false", "which"}
DENIED_GIT = {"grep", "ls-files", "ls-tree"}
PIPELINE_SAFE = {"grep", "egrep", "fgrep", "head", "tail", "wc", "jq", "awk", "sort", "sed", "tr", "cut", "uniq", "tee"}
GUIDANCE = {
"rg": "Use Catenary's grep tool instead.",
"ag": "Use Catenary's grep tool instead.",
"ack": "Use Catenary's grep tool instead.",
"fd": "Use Catenary's grep tool instead.",
"grep": "Use Catenary's grep tool instead.",
"egrep": "Use Catenary's grep tool instead.",
"fgrep": "Use Catenary's grep tool instead.",
"rgrep": "Use Catenary's grep tool instead.",
"zgrep": "Use Catenary's grep tool instead.",
"ls": "Use Catenary's glob tool instead.",
"dir": "Use Catenary's glob tool instead.",
"tree": "Use Catenary's glob tool instead.",
"find": "Use Catenary's glob tool instead.",
"cat": "Use the Read tool instead.",
"head": "Use the Read tool instead.",
"tail": "Use the Read tool instead.",
"less": "Use the Read tool instead.",
"more": "Use the Read tool instead.",
"sed": "Use Catenary's replace tool instead.",
"cargo": "Use a make target instead. If no target exists, suggest one.",
"rustc": "Use a make target instead. If no target exists, suggest one.",
"rustup": "Use a make target instead. If no target exists, suggest one.",
"prettier": "Use a make target instead. If no target exists, suggest one.",
}
DEFAULT_DENY = "Not available in constrained mode."
def find_command(tokens):
for i, token in enumerate(tokens):
if re.match(r'^[A-Za-z_][A-Za-z_0-9]*=', token):
continue
return i
return None
def check(cmd_string):
cmd_string = _strip_heredoc_bodies(cmd_string)
sequential = _quote_aware_split(cmd_string, _SEQ_SPLIT_RE)
for seq in sequential:
stages = _quote_aware_split(seq, _PIPE_SPLIT_RE)
for pipe_pos, segment in enumerate(stages):
segment = segment.strip()
if not segment:
continue
try:
tokens = shlex.split(segment)
except ValueError:
tokens = segment.split()
if not tokens:
continue
for m in _SUBSHELL_RE.finditer(segment):
inner = (m.group(1) or m.group(2) or m.group(3) or "").strip()
if inner:
reason = check(inner)
if reason:
return reason
cmd_idx = find_command(tokens)
if cmd_idx is None:
continue
name = os.path.basename(tokens[cmd_idx])
rest = tokens[cmd_idx:]
if name not in ALLOWED:
if name in ("cat", "head", "tail") and any(t.startswith("<<") for t in rest):
continue
if pipe_pos > 0 and name in PIPELINE_SAFE:
continue
return GUIDANCE.get(name, DEFAULT_DENY)
if name == "git" and len(rest) > 1 and rest[1] in DENIED_GIT:
return "Use Catenary's grep or glob tools instead."
return None
def extract_command(data, fmt):
if fmt == "claude":
if data.get("tool_name") != "Bash":
return None
return data.get("tool_input", {}).get("command", "")
else: if data.get("tool_name") != "run_shell_command":
return None
tool_input = data.get("tool_input") or data.get("args") or {}
return tool_input.get("command", "")
def deny_response(fmt, command, reason):
if fmt == "claude":
return {
"suppressOutput": True,
"systemMessage": f"Blocked: {command}",
"hookSpecificOutput": {
"hookEventName": "PreToolUse",
"permissionDecision": "deny",
"permissionDecisionReason": reason,
},
}
else: return {"decision": "deny", "reason": reason}
def main():
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument("--format", choices=["claude", "gemini"], required=True)
args = parser.parse_args()
try:
data = json.load(sys.stdin)
except (json.JSONDecodeError, EOFError):
sys.exit(0)
command = extract_command(data, args.format)
if command is None:
sys.exit(0)
reason = check(command)
if reason:
json.dump(deny_response(args.format, command, reason), sys.stdout)
if __name__ == "__main__":
main()