from __future__ import annotations
import argparse
import re
import sys
from pathlib import Path
RULES = [
{
"name": "no_yaml",
"patterns": [
("use serde_yaml", re.compile(r"\buse\s+serde_yaml\b")),
("use serde_yml", re.compile(r"\buse\s+serde_yml\b")),
("use yaml_rust", re.compile(r"\buse\s+yaml_rust\b")),
("use quick_xml", re.compile(r"\buse\s+quick_xml\b")),
("use xml_rs", re.compile(r"\buse\s+xml_rs\b")),
("serde_yaml::*", re.compile(r"\bserde_yaml\s*::")),
("yaml_rust::*", re.compile(r"\byaml_rust\s*::")),
],
"message": ("config goes through env vars; "
"no schema'd files in core source"),
},
{
"name": "no_toml_parse",
"patterns": [
("toml::from_str", re.compile(r"\btoml\s*::\s*from_str\b")),
("toml::to_string", re.compile(r"\btoml\s*::\s*to_string\b")),
("toml::de::*", re.compile(r"\btoml\s*::\s*de\s*::")),
("toml::ser::*", re.compile(r"\btoml\s*::\s*ser\s*::")),
("use toml::", re.compile(r"\buse\s+toml\s*::")),
],
"message": ("Cargo.toml is fine; src/*.rs must not parse TOML at runtime"),
},
{
"name": "no_last_modified",
"patterns": [
('"last-modified" string',
re.compile(r'"last-modified"', re.IGNORECASE)),
("header::LAST_MODIFIED",
re.compile(r"\bheader\s*::\s*LAST_MODIFIED\b")),
("LAST_MODIFIED identifier",
re.compile(r"(?<!:)\bLAST_MODIFIED\b")),
],
"message": ("Last-Modified bypasses the audit chain as a freshness signal; "
"remove it from any DEFAULT_PERSIST_HEADERS list"),
},
{
"name": "no_hardcoded_secrets",
"patterns": [
('Bearer <hardcoded>',
re.compile(r'"Bearer\s+[A-Za-z0-9._\-]+"')),
('Authorization: Bearer literal',
re.compile(r'"Authorization:\s*Bearer\s+\S+"', re.IGNORECASE)),
('api_key = "..."',
re.compile(r'\bapi[_-]?key\s*[:=]\s*"[^"]{8,}"', re.IGNORECASE)),
('secret = "..."',
re.compile(r'\bsecret\s*[:=]\s*"[^"]{8,}"', re.IGNORECASE)),
],
"message": ("read tokens/keys from env or boot config; "
"never inline a literal secret in source"),
},
]
SKIP_DIR_PARTS = {"target", "node_modules", ".git", ".cache",
"incremental", "deps", ".cargo"}
SKIP_FILE_NAMES = {"contraband.py", "no_json.py"}
def should_skip(path: Path) -> bool:
if path.name in SKIP_FILE_NAMES:
return True
return any(p in SKIP_DIR_PARTS for p in path.parts)
def line_of(src: str, idx: int) -> int:
return src.count("\n", 0, idx) + 1
def scan_file(path: Path, rules: list) -> list[tuple[str, int, str]]:
try:
src = path.read_text(encoding="utf-8", errors="replace")
except Exception as exc:
return [("read_error", 0, f"read failed: {exc}")]
seen = set()
out = []
for rule in rules:
for label, pat in rule["patterns"]:
for m in pat.finditer(src):
line = line_of(src, m.start())
key = (rule["name"], line, label)
if key in seen:
continue
seen.add(key)
out.append((rule["name"], line, label))
out.sort(key=lambda t: (t[1], t[0]))
return out
def collect_files(targets: list[Path]) -> tuple[list[Path], list[Path]]:
files: list[Path] = []
missing: list[Path] = []
for t in targets:
if not t.exists():
missing.append(t)
continue
if t.is_file() and t.suffix == ".rs":
if not should_skip(t):
files.append(t)
elif t.is_dir():
for f in sorted(t.rglob("*.rs")):
if not should_skip(f):
files.append(f)
return files, missing
def main(argv: list[str]) -> int:
ap = argparse.ArgumentParser(description=__doc__.splitlines()[0])
ap.add_argument("paths", nargs="*",
help="files or directories to scan")
ap.add_argument("--only", action="append", default=[],
help="run only these rules (repeatable). "
"Available: %s"
% ", ".join(r["name"] for r in RULES))
args = ap.parse_args(argv[1:])
rules = RULES
if args.only:
keep = set(args.only)
unknown = keep - {r["name"] for r in RULES}
if unknown:
print("unknown rule(s): %s" % ", ".join(sorted(unknown)))
return 2
rules = [r for r in RULES if r["name"] in keep]
targets = ([Path(p) for p in args.paths]
if args.paths
else [Path(__file__).resolve().parent])
files, missing = collect_files(targets)
if missing:
for t in missing:
print(f"contraband: cannot stat {t} (does not exist)",
file=sys.stderr)
return 2
if not files:
print("contraband: no .rs files found", file=sys.stderr)
return 2
by_file: dict[Path, list[tuple[str, int, str]]] = {}
for f in files:
v = scan_file(f, rules)
if v:
by_file[f] = v
if not by_file:
print("contraband: clean (%d rust files scanned, %d rules)"
% (len(files), len(rules)))
return 0
by_rule: dict[str, int] = {}
total = 0
for path in sorted(by_file):
print("\n%s" % path.as_posix())
for rule_name, line, label in by_file[path]:
print(" line %d [%s] %s" % (line, rule_name, label))
by_rule[rule_name] = by_rule.get(rule_name, 0) + 1
total += 1
print("\n[FAIL] %d files, %d violations across %d rules"
% (len(by_file), total, len(by_rule)))
print(" breakdown:")
rule_lookup = {r["name"]: r for r in rules}
for name in sorted(by_rule):
print(" %-22s %d -- %s"
% (name, by_rule[name], rule_lookup[name]["message"]))
return 1
if __name__ == "__main__":
sys.exit(main(sys.argv))