import sys
import os
import re
import argparse
import subprocess
from pathlib import Path
from typing import Optional, Dict, List
def check_llvm_cov_installed() -> bool:
try:
result = subprocess.run(
['cargo', 'llvm-cov', '--version'],
capture_output=True,
text=True,
timeout=10
)
return result.returncode == 0
except (subprocess.TimeoutExpired, FileNotFoundError):
return False
def parse_coverage_line(line: str) -> Optional[Dict[str, float]]:
if 'TOTAL' in line:
parts = line.split()
if len(parts) >= 13:
try:
region_cov = float(parts[3].rstrip('%'))
function_cov = float(parts[6].rstrip('%'))
line_cov = float(parts[9].rstrip('%'))
return {
'region': region_cov,
'function': function_cov,
'line': line_cov,
}
except (ValueError, IndexError):
pass
return None
def parse_file_coverage(lines: List[str]) -> Dict[str, Dict[str, float]]:
files = {}
for line in lines:
if line.startswith('Filename') or line.startswith('TOTAL') or line.startswith('-'):
continue
parts = line.split()
if len(parts) >= 13:
try:
filename = parts[0]
line_cov = float(parts[9].rstrip('%'))
files[filename] = {
'line': line_cov,
}
except (ValueError, IndexError):
pass
return files
def parse_uncovered_lines(lines: List[str]) -> Dict[str, List[int]]:
uncovered = {}
in_uncovered_section = False
for line in lines:
if 'Uncovered Lines:' in line:
in_uncovered_section = True
continue
if not in_uncovered_section:
continue
if ':' in line:
parts = line.split(':', 1)
if len(parts) == 2:
filepath = parts[0].strip()
line_nums_str = parts[1].strip()
line_nums = []
for part in line_nums_str.split(','):
part = part.strip()
if '-' in part:
try:
start, end = part.split('-')
line_nums.extend(range(int(start), int(end) + 1))
except ValueError:
pass
else:
try:
line_nums.append(int(part))
except ValueError:
pass
if line_nums:
uncovered[filepath] = line_nums
return uncovered
def run_coverage(crate_path: Path, test_filter: Optional[str], html: bool, show_files: bool, include_integration: bool, show_uncovered: bool, timeout: int) -> Optional[str]:
if not crate_path.exists():
print(f"Error: Crate path {crate_path} does not exist", file=sys.stderr)
return None
cmd = [
'cargo', 'llvm-cov',
'--lib',
]
if include_integration:
cmd.append('--tests')
if html:
cmd.append('--html')
elif show_uncovered:
cmd.append('--show-missing-lines')
elif not show_files:
cmd.append('--summary-only')
if test_filter:
cmd.extend(['--', test_filter])
elif include_integration:
cmd.extend(['--', '--test-threads=1'])
print(f"Running coverage analysis for {crate_path.name}...")
if include_integration:
print(" Test types: unit + integration")
else:
print(" Test types: unit only")
if test_filter:
print(f" Test filter: {test_filter}")
print()
try:
test_env = os.environ.copy()
result = subprocess.run(
cmd,
cwd=crate_path,
capture_output=True,
text=True,
timeout=timeout,
env=test_env
)
if result.returncode != 0:
stderr_lower = result.stderr.lower()
has_fatal_error = any(marker in stderr_lower for marker in [
'error: could not compile',
'error: failed to',
'error: no such',
'could not compile',
])
if has_fatal_error:
print("Error running coverage analysis:", file=sys.stderr)
print(result.stderr, file=sys.stderr)
return None
elif result.stderr:
print("Warning from cargo-llvm-cov:", file=sys.stderr)
print(result.stderr, file=sys.stderr)
return result.stdout
except subprocess.TimeoutExpired:
print(f"Error: Coverage analysis timed out after {timeout} seconds", file=sys.stderr)
return None
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
return None
def print_coverage_report(coverage: Dict[str, float], goal: float, minimum: float, test_filter: Optional[str], filtered_cov: Optional[float] = None, crate_name: Optional[str] = None):
line_cov = filtered_cov if filtered_cov is not None else coverage['line']
status = determine_status(line_cov, goal, minimum)
print("\n" + "="*60)
print("CODE COVERAGE REPORT")
print("="*60)
if test_filter:
print(f"Test Filter: {test_filter}")
if crate_name and filtered_cov is not None:
print(f"\n{crate_name} Line Coverage: {line_cov:.2f}%")
print(f"Overall Line Coverage: {coverage['line']:.2f}%")
elif filtered_cov is not None:
print(f"\nFiltered Line Coverage: {line_cov:.2f}%")
print(f"Overall Line Coverage: {coverage['line']:.2f}%")
else:
print(f"\nLine Coverage: {line_cov:.2f}%")
if filtered_cov is None:
print(f"Function Coverage: {coverage['function']:.2f}%")
print(f"Region Coverage: {coverage['region']:.2f}%")
print(f"\nStatus: {status}\n")
print_thresholds(goal, minimum)
print_status_details(line_cov, goal, minimum)
print("="*60 + "\n")
def determine_status(coverage: float, goal: float, minimum: float) -> str:
if coverage >= goal:
return "✅ EXCELLENT"
if coverage >= minimum:
return "⚠️ WARNING"
return "❌ VIOLATION"
def print_thresholds(goal: float, minimum: float):
print("Thresholds:")
print(f" Goal: {goal:.0f}% (target for excellent coverage)")
print(f" Minimum: {minimum:.0f}% (hard limit)")
def print_status_details(coverage: float, goal: float, minimum: float):
if coverage >= goal:
print("\n✓ Coverage meets goal threshold!")
elif coverage >= minimum:
gap = goal - coverage
print("\n⚠ Coverage meets minimum but below goal")
print(f" Need {gap:.2f}% more to reach goal")
else:
min_gap = minimum - coverage
goal_gap = goal - coverage
print("\n✗ Coverage below minimum threshold")
print(f" Need {min_gap:.2f}% more to meet minimum")
print(f" Need {goal_gap:.2f}% more to reach goal")
def calculate_average_coverage(files: Dict[str, Dict[str, float]], filter_prefix: str) -> Optional[float]:
filtered = {k: v for k, v in files.items() if filter_prefix in k}
if not filtered:
return None
total = sum(v['line'] for v in filtered.values())
return total / len(filtered)
def calculate_filtered_coverage_stats(files: Dict[str, Dict[str, float]], crate_name: str) -> Optional[float]:
has_crate_prefix = any(k.startswith(f"{crate_name}/") for k in files.keys())
if has_crate_prefix:
crate_files = {k: v for k, v in files.items() if k.startswith(f"{crate_name}/")}
else:
crate_files = files
if not crate_files:
return None
total = sum(v['line'] for v in crate_files.values())
return total / len(crate_files)
def extract_module_prefix(test_filter: Optional[str]) -> Optional[str]:
if not test_filter:
return None
module = test_filter.rstrip(':')
return f"{module}/"
def format_filename_with_prefix(filename: str, strip_prefix: Optional[str]) -> str:
if strip_prefix and filename.startswith(strip_prefix):
return filename[len(strip_prefix):]
return filename
def format_uncovered_lines(line_numbers: List[int], max_display: int = None) -> str:
if not line_numbers:
return ""
sorted_lines = sorted(line_numbers)
ranges = []
start = sorted_lines[0]
end = sorted_lines[0]
for num in sorted_lines[1:]:
if num == end + 1:
end = num
else:
if start == end:
ranges.append(str(start))
else:
ranges.append(f"{start}-{end}")
start = num
end = num
if start == end:
ranges.append(str(start))
else:
ranges.append(f"{start}-{end}")
result = ", ".join(ranges)
if max_display and len(result) > max_display:
return result[:max_display] + f"... ({len(sorted_lines)} total lines)"
return result
def find_uncovered_for_file(filepath: str, uncovered_map: Dict[str, List[int]]) -> Optional[List[int]]:
if filepath in uncovered_map:
return uncovered_map[filepath]
for uncov_path, lines in uncovered_map.items():
if uncov_path.endswith(filepath) or filepath in uncov_path:
return lines
return None
def print_file_coverage(files: Dict[str, Dict[str, float]], show_all: bool, module_filter: Optional[str] = None, crate_name: Optional[str] = None, uncovered_lines: Optional[Dict[str, List[int]]] = None):
print("\nPer-File Coverage:")
print("-" * 60)
strip_prefix = None
if not show_all and crate_name:
has_crate_prefix = any(k.startswith(f"{crate_name}/") for k in files.keys())
if has_crate_prefix:
files = {k: v for k, v in files.items() if k.startswith(f"{crate_name}/")}
strip_prefix = f"{crate_name}/src/"
if module_filter:
module_files = {k: v for k, v in files.items() if module_filter in k}
if module_files:
module_name = module_filter.rstrip('/').capitalize()
print(f"\n{module_name} Files:")
for filename, cov in sorted(module_files.items()):
line_cov = cov['line']
status = "✅" if line_cov >= 90 else "⚠️ " if line_cov >= 80 else "❌"
display_name = format_filename_with_prefix(filename, strip_prefix)
print(f" {status} {display_name:50s} {line_cov:6.2f}%")
if uncovered_lines:
uncov = find_uncovered_for_file(filename, uncovered_lines)
if uncov:
formatted_lines = format_uncovered_lines(uncov)
print(f" Uncovered: {formatted_lines}")
avg = calculate_average_coverage(files, module_filter)
if avg:
print(f"\n Average coverage: {avg:.2f}%")
else:
print(f"\n No files found matching '{module_filter}'")
if show_all or not module_filter:
shown_files = set()
common_modules = ['billing/', 'domain/', 'executor/', 'llm/', 'storage/', 'tools/', 'agents/']
for prefix in common_modules:
module_files = {k: v for k, v in files.items() if prefix in k and k not in shown_files}
if module_files:
module_name = prefix.rstrip('/').capitalize()
print(f"\n{module_name} Files:")
for filename, cov in sorted(module_files.items()):
line_cov = cov['line']
status = "✅" if line_cov >= 90 else "⚠️ " if line_cov >= 80 else "❌"
display_name = format_filename_with_prefix(filename, strip_prefix)
print(f" {status} {display_name:50s} {line_cov:6.2f}%")
if uncovered_lines:
uncov = find_uncovered_for_file(filename, uncovered_lines)
if uncov:
formatted_lines = format_uncovered_lines(uncov)
print(f" Uncovered: {formatted_lines}")
shown_files.add(filename)
other_files = {k: v for k, v in files.items() if k not in shown_files}
if other_files:
print("\nOther Files:")
for filename, cov in sorted(other_files.items()):
line_cov = cov['line']
status = "✅" if line_cov >= 90 else "⚠️ " if line_cov >= 80 else "❌"
display_name = format_filename_with_prefix(filename, strip_prefix)
print(f" {status} {display_name:50s} {line_cov:6.2f}%")
if uncovered_lines:
uncov = find_uncovered_for_file(filename, uncovered_lines)
if uncov:
formatted_lines = format_uncovered_lines(uncov)
print(f" Uncovered: {formatted_lines}")
print("-" * 60)
def main():
parser = create_argument_parser()
args = parser.parse_args()
if not validate_arguments(args):
return 2
if not check_llvm_cov_installed():
print("Error: cargo-llvm-cov is not installed", file=sys.stderr)
print("Install it with: cargo install cargo-llvm-cov", file=sys.stderr)
return 3
crate_path = Path(args.crate_path).resolve()
show_uncovered = args.show_uncovered
show_files = args.show_files or show_uncovered
output = run_coverage(crate_path, args.test_filter, args.html, show_files, args.include_integration, show_uncovered, args.timeout)
if output is None:
return 3
lines = output.splitlines()
coverage = None
for line in lines:
coverage = parse_coverage_line(line)
if coverage:
break
if coverage is None:
print("Error: Failed to parse coverage data", file=sys.stderr)
print("\nOutput from coverage tool:")
print(output)
return 3
files = parse_file_coverage(lines)
uncovered = None
if show_uncovered:
uncovered = parse_uncovered_lines(lines)
filtered_cov = None
module_filter = None
crate_filter_name = None
if args.test_filter and files:
module_filter = extract_module_prefix(args.test_filter)
if module_filter:
filtered_cov = calculate_average_coverage(files, module_filter)
elif show_files and files:
filtered_cov = calculate_filtered_coverage_stats(files, crate_path.name)
crate_filter_name = crate_path.name
print_coverage_report(coverage, args.goal, args.minimum, args.test_filter, filtered_cov, crate_filter_name)
if show_files or args.test_filter:
if files:
print_file_coverage(files, False, module_filter, crate_path.name, uncovered)
if args.html:
html_file = crate_path / 'target' / 'llvm-cov' / 'html' / 'index.html'
if html_file.exists():
print(f"\nHTML report: {html_file}\n")
line_cov = filtered_cov if filtered_cov is not None else coverage['line']
if line_cov >= args.goal:
return 0 if line_cov >= args.minimum:
return 1 return 2
def create_argument_parser():
parser = argparse.ArgumentParser(
description='Verify code coverage for multi-llm unit tests',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s
%(prog)s --test-filter providers::
%(prog)s --goal 90 --minimum 80 --html
%(prog)s --show-files
"""
)
parser.add_argument(
'crate_path',
nargs='?',
default='.',
help='Path to the crate to check (default: current directory)'
)
parser.add_argument(
'--test-filter',
help='Test filter (e.g., domain::, session::)'
)
parser.add_argument(
'--goal',
type=float,
default=90.0,
help='Goal coverage percentage (default: 90.0)'
)
parser.add_argument(
'--minimum',
type=float,
default=80.0,
help='Minimum acceptable coverage percentage (default: 80.0)'
)
parser.add_argument(
'--html',
action='store_true',
help='Generate HTML coverage report'
)
parser.add_argument(
'--show-files',
action='store_true',
help='Show per-file coverage details (filtered to the specified crate)'
)
parser.add_argument(
'--show-uncovered',
action='store_true',
help='Show uncovered lines for each file (detailed line-by-line view)'
)
parser.add_argument(
'--include-integration',
action='store_true',
help='Include integration tests (tests/) in coverage analysis'
)
parser.add_argument(
'--timeout',
type=int,
default=300,
help='Timeout in seconds (default: 300)'
)
return parser
def validate_arguments(args) -> bool:
if args.goal < args.minimum:
print("Error: Goal threshold must be >= minimum threshold", file=sys.stderr)
return False
return True
if __name__ == '__main__':
sys.exit(main())