from __future__ import annotations
import json
import re
import subprocess
import sys
from dataclasses import dataclass, field
from pathlib import Path
RATCHET_FILE = Path(".github/coverage-ratchet.json")
SOURCE_DIRS = [Path("src"), Path("hegel-macros/src")]
_nocov_block_cache: dict[Path, set[int]] = {}
def get_nocov_block_lines(file_path: Path) -> set[int]:
resolved = file_path.resolve()
if resolved in _nocov_block_cache:
return _nocov_block_cache[resolved]
excluded: set[int] = set()
in_block = False
try:
with file_path.open() as f:
for i, line in enumerate(f, 1):
if re.search(r"//\s*nocov\s+start\b", line):
in_block = True
continue
if re.search(r"//\s*nocov\s+end\b", line):
in_block = False
continue
if in_block:
excluded.add(i)
except (OSError, IOError):
pass
_nocov_block_cache[resolved] = excluded
return excluded
_cfg_windows_cache: dict[Path, set[int]] = {}
def get_cfg_windows_lines(file_path: Path) -> set[int]:
resolved = file_path.resolve()
if resolved in _cfg_windows_cache:
return _cfg_windows_cache[resolved]
excluded: set[int] = set()
try:
with file_path.open() as f:
lines = f.readlines()
except (OSError, IOError):
_cfg_windows_cache[resolved] = excluded
return excluded
i = 0
while i < len(lines):
stripped = lines[i].strip()
if stripped == "#[cfg(windows)]":
brace_depth = 0
started = False
for j in range(i + 1, len(lines)):
for ch in lines[j]:
if ch == "{":
brace_depth += 1
started = True
elif ch == "}":
brace_depth -= 1
if started:
excluded.add(j + 1)
if started and brace_depth == 0:
break
i += 1
_cfg_windows_cache[resolved] = excluded
return excluded
@dataclass
class UncoveredLine:
file: Path
line_number: int
content: str
def is_structural_syntax_only(self) -> bool:
stripped = self.content.strip()
cleaned = re.sub(r"[})\];,\s]", "", stripped)
return len(cleaned) == 0 and len(stripped) > 0
def is_todo_placeholder(self) -> bool:
stripped = self.content.strip()
return bool(re.match(r"^todo!\s*\([^)]*\)\s*;?\s*$", stripped))
def is_unreachable_placeholder(self) -> bool:
stripped = self.content.strip()
if re.search(r"unreachable!\s*\(.*\)\s*[;,]?\s*$", stripped):
return True
if re.search(r"unreachable!\s*\(", stripped) and not stripped.rstrip(
";"
).rstrip().endswith(")"):
return True
return False
def is_inside_excluded_macro(self) -> bool:
try:
with self.file.open() as f:
lines = f.readlines()
except (OSError, IOError):
return False
idx = self.line_number - 1
for start in range(idx - 1, max(idx - 20, -1), -1):
if start < 0 or start >= len(lines):
continue
line = lines[start].strip()
if re.search(r"\b(unreachable|todo|assert)!\s*\(", line):
paren_depth = 0
for check_idx in range(start, idx):
for ch in lines[check_idx]:
if ch == "(":
paren_depth += 1
elif ch == ")":
paren_depth -= 1
if paren_depth > 0:
return True
return False
return False
def has_nocov_annotation(self) -> bool:
if re.search(r"//\s*nocov\b", self.content):
return True
return self.line_number in get_nocov_block_lines(self.file)
def is_test_code(self) -> bool:
try:
with self.file.open() as f:
lines = f.readlines()
except (OSError, IOError):
return False
idx = self.line_number - 1
brace_depth = 0
for i in range(idx, -1, -1):
line = lines[i].strip() if i < len(lines) else ""
brace_depth += line.count("}")
brace_depth -= line.count("{")
if brace_depth < 0:
for j in range(i, max(i - 5, -1), -1):
check_line = lines[j].strip() if j < len(lines) else ""
if "#[cfg(test)]" in check_line:
return True
brace_depth = 0
return False
def is_windows_only(self) -> bool:
return self.line_number in get_cfg_windows_lines(self.file)
def is_ignored_test_body(self) -> bool:
try:
with self.file.open() as f:
lines = f.readlines()
except (OSError, IOError):
return False
idx = self.line_number - 1
for i in range(idx, max(idx - 50, -1), -1):
line = lines[i].strip() if i < len(lines) else ""
if line.startswith("fn "):
for j in range(i - 1, max(i - 4, -1), -1):
attr = lines[j].strip() if j < len(lines) else ""
if attr.startswith("#[ignore"):
return True
return False
return False
@dataclass
class CoverageData:
uncovered_lines: list[UncoveredLine] = field(default_factory=list)
covered_lines: dict[Path, set[int]] = field(default_factory=dict)
def get_target_triple() -> str:
result = subprocess.run(
["rustc", "-vV"],
capture_output=True,
text=True,
)
for line in result.stdout.splitlines():
if line.startswith("host:"):
return line.split(":")[1].strip()
return "unknown"
def run_coverage() -> Path:
print("Running coverage analysis...")
lcov_path = Path("lcov.info")
print(" Cleaning previous coverage data...")
result = subprocess.run(
["cargo", "llvm-cov", "clean", "--workspace"],
capture_output=True,
text=True,
)
if result.returncode != 0:
if result.stderr:
print(result.stderr, file=sys.stderr)
print("ERROR: Failed to clean coverage data", file=sys.stderr)
sys.exit(1)
print(" Running tests with coverage...")
result = subprocess.run(
["cargo", "llvm-cov", "--no-report", "--all-features"],
capture_output=True,
text=True,
)
if result.stdout:
print(result.stdout)
if result.returncode != 0:
if result.stderr:
print(result.stderr, file=sys.stderr)
print("ERROR: Coverage run failed", file=sys.stderr)
sys.exit(1)
print(" Tests passed")
llvm_cov_target = Path("target/llvm-cov-target")
subprocess_bins = sorted(
p
for p in llvm_cov_target.glob("debug/temp_hegel_test_*")
if p.is_file() and not p.suffix )
print(f" Generating report ({len(subprocess_bins)} subprocess binaries)...")
if subprocess_bins:
toolchain_result = subprocess.run(
["rustc", "--print", "sysroot"],
capture_output=True,
text=True,
)
sysroot = toolchain_result.stdout.strip()
llvm_bin = Path(sysroot) / "lib/rustlib" / get_target_triple() / "bin"
llvm_profdata = llvm_bin / "llvm-profdata"
llvm_cov_bin = llvm_bin / "llvm-cov"
if llvm_profdata.exists() and llvm_cov_bin.exists():
profraw_files = list(llvm_cov_target.glob("*.profraw"))
merged_profdata = llvm_cov_target / "merged.profdata"
result = subprocess.run(
[str(llvm_profdata), "merge", "-sparse"]
+ [str(f) for f in profraw_files]
+ ["-o", str(merged_profdata)],
capture_output=True,
text=True,
)
if result.returncode != 0:
print(
"WARNING: profdata merge failed, using standard report",
file=sys.stderr,
)
else:
main_bins = sorted(
p
for p in llvm_cov_target.glob("debug/deps/hegel-*")
if p.is_file() and not p.suffix
)
all_bins = main_bins + subprocess_bins
test_bins = sorted(
p
for p in llvm_cov_target.glob("debug/deps/test_*")
if p.is_file() and not p.suffix
)
all_bins.extend(test_bins)
if all_bins:
cmd = [
str(llvm_cov_bin),
"export",
"-format=lcov",
f"-instr-profile={merged_profdata}",
]
cmd.append(str(all_bins[0]))
for b in all_bins[1:]:
cmd.extend(["-object", str(b)])
cmd.extend(
[
"-ignore-filename-regex=\\.cargo/registry",
"-ignore-filename-regex=/rustc/",
"-ignore-filename-regex=/rustlib/",
"-ignore-filename-regex=/tmp/",
"-ignore-filename-regex=/var/folders/",
"-ignore-filename-regex=tests/",
]
)
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0 and result.stdout:
lcov_path.write_text(result.stdout)
return lcov_path
else:
print(
"WARNING: llvm-cov export failed, using standard report",
file=sys.stderr,
)
if result.stderr:
print(result.stderr[:500], file=sys.stderr)
result = subprocess.run(
["cargo", "llvm-cov", "report", "--lcov", f"--output-path={lcov_path}"],
capture_output=True,
text=True,
)
if result.returncode != 0:
if result.stderr:
print(result.stderr, file=sys.stderr)
print("ERROR: Coverage report generation failed", file=sys.stderr)
sys.exit(1)
if not lcov_path.exists():
print("ERROR: lcov.info was not generated", file=sys.stderr)
sys.exit(1)
return lcov_path
def get_line_content(file_path: Path, line_number: int) -> str:
try:
with file_path.open() as f:
for i, line in enumerate(f, 1):
if i == line_number:
return line.rstrip("\n")
except (OSError, IOError):
pass
return ""
def parse_lcov(lcov_path: Path) -> CoverageData:
data = CoverageData()
current_file: Path | None = None
with lcov_path.open() as f:
for line in f:
line = line.strip()
if line.startswith("SF:"):
current_file = Path(line[3:])
elif line.startswith("DA:") and current_file is not None:
match = re.match(r"DA:(\d+),(\d+)", line)
if match:
line_number = int(match.group(1))
exec_count = int(match.group(2))
if exec_count == 0:
content = get_line_content(current_file, line_number)
data.uncovered_lines.append(
UncoveredLine(current_file, line_number, content)
)
else:
data.covered_lines.setdefault(current_file, set()).add(
line_number
)
elif line == "end_of_record":
current_file = None
return data
def find_line_annotations() -> list[tuple[Path, int, str]]:
pattern = re.compile(r"//\s*nocov\b")
block_marker = re.compile(r"//\s*nocov\s+(start|end)\b")
results: list[tuple[Path, int, str]] = []
for src_dir in SOURCE_DIRS:
if not src_dir.exists():
continue
for rs_file in sorted(src_dir.rglob("*.rs")):
try:
with rs_file.open() as f:
for i, line in enumerate(f, 1):
if pattern.search(line):
if block_marker.search(line):
continue
results.append((rs_file, i, line))
except (OSError, IOError):
continue
return results
def remove_annotation_from_line(line: str) -> str:
pattern = r"\s*//\s*nocov\b.*$"
cleaned = re.sub(pattern, "", line.rstrip("\n"))
return cleaned + "\n" if line.endswith("\n") else cleaned
def cleanup_unnecessary_annotations(coverage: CoverageData) -> int:
nocov_removed = 0
modifications: dict[Path, set[int]] = {}
for file_path, line_num, _ in find_line_annotations():
covered = coverage.covered_lines.get(file_path, set())
if line_num in covered:
modifications.setdefault(file_path, set()).add(line_num)
nocov_removed += 1
for file_path, line_nums in modifications.items():
try:
with file_path.open() as f:
lines = f.readlines()
for line_num in line_nums:
idx = line_num - 1
if idx < len(lines):
lines[idx] = remove_annotation_from_line(lines[idx])
with file_path.open("w") as f:
f.writelines(lines)
except (OSError, IOError) as e:
print(f"WARNING: Could not modify {file_path}: {e}", file=sys.stderr)
return nocov_removed
def count_annotations() -> int:
nocov_inline_pattern = re.compile(r"//\s*nocov\b")
nocov_start_pattern = re.compile(r"//\s*nocov\s+start\b")
nocov_end_pattern = re.compile(r"//\s*nocov\s+end\b")
nocov_count = 0
for src_dir in SOURCE_DIRS:
if not src_dir.exists():
continue
for rs_file in sorted(src_dir.rglob("*.rs")):
try:
windows_lines = get_cfg_windows_lines(rs_file)
in_nocov_block = False
with rs_file.open() as f:
for line_num, line in enumerate(f, 1):
if nocov_start_pattern.search(line):
in_nocov_block = True
continue
if nocov_end_pattern.search(line):
in_nocov_block = False
continue
if line_num in windows_lines:
continue
if in_nocov_block:
nocov_count += 1
elif nocov_inline_pattern.search(line):
nocov_count += 1
except (OSError, IOError):
continue
return nocov_count
def read_ratchet() -> int | float:
if not RATCHET_FILE.exists():
return float("inf")
try:
with RATCHET_FILE.open() as f:
data = json.load(f)
return data.get("nocov", 0)
except (json.JSONDecodeError, OSError, IOError):
return float("inf")
def write_ratchet(nocov: int) -> None:
RATCHET_FILE.parent.mkdir(parents=True, exist_ok=True)
with RATCHET_FILE.open("w") as f:
json.dump({"nocov": nocov}, f, indent=2)
f.write("\n")
def check_uncovered_lines(uncovered: list[UncoveredLine]) -> int:
structural: list[UncoveredLine] = []
test_code: list[UncoveredLine] = []
placeholders: list[UncoveredLine] = []
macro_continuations: list[UncoveredLine] = []
nocov: list[UncoveredLine] = []
ignored_tests: list[UncoveredLine] = []
windows_only: list[UncoveredLine] = []
actual: list[UncoveredLine] = []
for line in uncovered:
if line.is_structural_syntax_only():
structural.append(line)
elif line.is_test_code():
test_code.append(line)
elif line.is_todo_placeholder() or line.is_unreachable_placeholder():
placeholders.append(line)
elif line.is_inside_excluded_macro():
macro_continuations.append(line)
elif line.has_nocov_annotation():
nocov.append(line)
elif line.is_ignored_test_body():
ignored_tests.append(line)
elif line.is_windows_only():
windows_only.append(line)
else:
actual.append(line)
print()
print("Line Coverage Analysis")
print("======================")
print()
print(f"Uncovered closing braces (allowed): {len(structural)}")
print(f"Uncovered #[cfg(test)] code (allowed): {len(test_code)}")
print(f"Uncovered todo!/unreachable! (allowed): {len(placeholders)}")
print(f"Uncovered macro continuations (allowed): {len(macro_continuations)}")
print(f"Uncovered #[ignore]d test bodies (allowed): {len(ignored_tests)}")
print(f"Uncovered #[cfg(windows)] code (allowed): {len(windows_only)}")
print(f"Uncovered // nocov lines (allowed): {len(nocov)}")
print(f"Uncovered code lines: {len(actual)}")
print()
if not actual:
print("All uncovered lines are allowable patterns.")
return 0
print("Found uncovered CODE that requires tests:")
print()
for line in actual:
try:
rel_path = line.file.relative_to(Path.cwd())
except ValueError:
rel_path = line.file
print(f" {rel_path}:{line.line_number}: {line.content.strip()}")
print()
print("Add tests for the uncovered code, or if truly untestable,")
print("add a // nocov annotation.")
return 1
def main() -> int:
lcov_path = run_coverage()
coverage = parse_lcov(lcov_path)
if coverage.uncovered_lines:
result = check_uncovered_lines(coverage.uncovered_lines)
if result != 0:
return result
else:
print("\n100% line coverage -- no uncovered lines at all!")
nocov_removed = cleanup_unnecessary_annotations(coverage)
if nocov_removed > 0:
print(f"\nRemoved {nocov_removed} unnecessary // nocov annotations")
nocov_count = count_annotations()
print(f"\nCoverage annotations: {nocov_count} // nocov")
nocov_limit = read_ratchet()
if nocov_count > nocov_limit:
print(f"\nCoverage annotation ratchet EXCEEDED!")
print(f" // nocov: {nocov_count} (limit: {nocov_limit})")
print()
print("The nocov ratchet may not be increased. Remove the")
print("annotations or add tests to cover the code.")
return 1
ratchet_changed = False
if nocov_count < nocov_limit:
old = nocov_limit if nocov_limit != float("inf") else "none"
print(f" Ratchet tightened: nocov {old} -> {nocov_count}")
write_ratchet(nocov_count)
ratchet_changed = True
elif not RATCHET_FILE.exists():
write_ratchet(nocov_count)
ratchet_changed = True
if ratchet_changed:
print(f" Updated {RATCHET_FILE}")
print("\nCoverage check PASSED!")
return 0
if __name__ == "__main__":
sys.exit(main())