import os
import sys
import re
import argparse
from pathlib import Path
def find_test_files(crate_path):
test_files = []
src_path = Path(crate_path) / 'src'
if not src_path.exists():
print(f"Error: {src_path} does not exist", file=sys.stderr)
return None
for test_file in src_path.rglob('tests/**/*.rs'):
if test_file.name != 'mod.rs':
test_files.append(test_file)
for rs_file in src_path.rglob('*.rs'):
if '/tests/' not in str(rs_file):
with open(rs_file, 'r') as f:
if '#[cfg(test)]' in f.read():
test_files.append(rs_file)
return test_files
def check_unit_documentation(file_path):
with open(file_path, 'r') as f:
content = f.read()
if not ('#[test]' in content or '#[tokio::test]' in content):
return {'has_tests': False}
has_unit_under_test = bool(re.search(r'UNITS? UNDER TEST:', content))
has_business_responsibility = bool(re.search(r'BUSINESS RESPONSIBILITY:', content))
has_test_coverage = bool(re.search(r'TEST COVERAGE:', content))
missing = []
if not has_unit_under_test:
missing.append('UNIT UNDER TEST')
if not has_business_responsibility:
missing.append('BUSINESS RESPONSIBILITY')
if not has_test_coverage:
missing.append('TEST COVERAGE')
return {
'has_tests': True,
'has_documentation': len(missing) == 0,
'missing': missing
}
def check_import_patterns(file_path):
with open(file_path, 'r') as f:
lines = f.readlines()
issues = []
in_function = False
brace_count = 0
for i, line in enumerate(lines, 1):
stripped = line.strip()
if re.match(r'(pub\s+)?fn\s+\w+', stripped):
in_function = True
brace_count = 0
if in_function:
brace_count += stripped.count('{') - stripped.count('}')
if brace_count <= 0:
in_function = False
if in_function and stripped.startswith('use '):
issues.append({
'line': i,
'issue': 'Inline use statement inside function (should be at top of file)',
'severity': 'violation'
})
return issues
def check_aaa_pattern(file_path):
with open(file_path, 'r') as f:
content = f.read()
suggestions = []
test_pattern = r'#\[(tokio::)?test\]\s*(?:async\s+)?fn\s+(\w+)'
test_matches = list(re.finditer(test_pattern, content))
for match in test_matches:
func_name = match.group(2)
func_start = match.start()
func_body_start = content.find('{', func_start)
if func_body_start == -1:
continue
brace_count = 1
pos = func_body_start + 1
while pos < len(content) and brace_count > 0:
if content[pos] == '{':
brace_count += 1
elif content[pos] == '}':
brace_count -= 1
pos += 1
func_body = content[func_body_start:pos]
has_arrange = '// Arrange' in func_body or '//Arrange' in func_body
has_act = '// Act' in func_body or '//Act' in func_body
has_assert = '// Assert' in func_body or '//Assert' in func_body
if len(func_body.splitlines()) > 10 and not (has_arrange or has_act or has_assert):
line_num = content[:func_start].count('\n') + 1
suggestions.append({
'line': line_num,
'function': func_name,
'issue': 'Consider adding AAA (Arrange-Act-Assert) comments for clarity',
'severity': 'info'
})
return suggestions
def check_trait_compliance(file_path):
with open(file_path, 'r') as f:
content = f.read()
suggestions = []
trait_pattern = r'pub\s+trait\s+(\w+)'
traits = [m.group(1) for m in re.finditer(trait_pattern, content)]
if not traits:
return suggestions
if 'trait_compliance_tests' not in content and 'trait_compliance' not in content:
impl_pattern = r'impl\s+\w+\s+for\s+(\w+)'
implementations = [m.group(1) for m in re.finditer(impl_pattern, content)]
if len(implementations) > 1:
suggestions.append({
'issue': f'Multiple implementations found ({len(implementations)}) but no trait_compliance_tests module',
'severity': 'warning',
'details': 'Consider adding trait compliance tests for consistency'
})
return suggestions
def check_todo_tests(file_path):
src_file = str(file_path).replace('/tests/', '/')
if '/tests/' in str(file_path):
parts = Path(file_path).parts
try:
tests_idx = parts.index('tests')
src_parts = list(parts[:tests_idx]) + list(parts[tests_idx + 1:])
src_file = Path(*src_parts)
except (ValueError, IndexError):
return []
if not src_file.exists():
return []
with open(src_file, 'r') as f:
src_content = f.read()
with open(file_path, 'r') as f:
test_content = f.read()
suggestions = []
todo_pattern = r'//\s*TODO[:\(]([^\\n]+)'
todos = list(re.finditer(todo_pattern, src_content))
if todos:
has_todo_tests = 'TODO' in test_content and 'test' in test_content.lower()
if not has_todo_tests and len(todos) > 2:
suggestions.append({
'issue': f'Source file has {len(todos)} TODO comment(s) but no corresponding TODO tests',
'severity': 'info',
'details': 'Consider adding tests that document TODO behavior'
})
return suggestions
return []
def main():
parser = argparse.ArgumentParser(
description='Verify unit tests follow multi-llm testing patterns'
)
parser.add_argument(
'crate_path',
help='Path to the crate to check (e.g., multi-llm)'
)
parser.add_argument(
'--strict',
action='store_true',
help='Treat warnings as violations'
)
args = parser.parse_args()
test_files = find_test_files(args.crate_path)
if test_files is None:
return 2
if not test_files:
print("⚠️ No test files found")
return 1
all_violations = []
all_warnings = []
all_suggestions = []
print(f"Checking test patterns in {len(test_files)} test file(s)...\n")
for test_file in test_files:
rel_path = test_file.relative_to(Path(args.crate_path) / 'src')
doc_check = check_unit_documentation(test_file)
if doc_check['has_tests'] and not doc_check['has_documentation']:
all_violations.append({
'file': str(rel_path),
'issue': f"Missing required documentation: {', '.join(doc_check['missing'])}",
'severity': 'violation'
})
import_issues = check_import_patterns(test_file)
for issue in import_issues:
if issue['severity'] == 'violation':
all_violations.append({
'file': str(rel_path),
'line': issue['line'],
'issue': issue['issue']
})
aaa_suggestions = check_aaa_pattern(test_file)
all_suggestions.extend([{
'file': str(rel_path),
**s
} for s in aaa_suggestions])
trait_suggestions = check_trait_compliance(test_file)
for suggestion in trait_suggestions:
if suggestion['severity'] == 'warning':
all_warnings.append({
'file': str(rel_path),
**suggestion
})
else:
all_suggestions.append({
'file': str(rel_path),
**suggestion
})
todo_suggestions = check_todo_tests(test_file)
all_suggestions.extend([{
'file': str(rel_path),
**s
} for s in todo_suggestions])
exit_code = 0
if all_violations:
print(f"❌ VIOLATIONS: {len(all_violations)} pattern violation(s):")
print(" These MUST be fixed.\n")
for violation in all_violations[:20]:
file_info = f"{violation['file']}"
if 'line' in violation:
file_info += f":{violation['line']}"
print(f" {file_info}")
print(f" {violation['issue']}")
if len(all_violations) > 20:
print(f"\n ... and {len(all_violations) - 20} more")
exit_code = 2
print()
if all_warnings or (args.strict and all_suggestions):
warning_list = all_warnings + (all_suggestions if args.strict else [])
print(f"{'⚠️ ' if not all_violations else ''}WARNINGS: {len(warning_list)} pattern warning(s):")
print(" These should be fixed but don't block progress.\n")
for warning in warning_list[:15]:
print(f" {warning['file']}")
print(f" {warning['issue']}")
if 'details' in warning:
print(f" {warning['details']}")
if len(warning_list) > 15:
print(f"\n ... and {len(warning_list) - 15} more")
if exit_code == 0:
exit_code = 1
print()
if all_suggestions and not args.strict:
print(f"{'ℹ️ ' if exit_code == 0 else ''}INFO: {len(all_suggestions)} suggestion(s):")
print(" Consider these improvements.\n")
for suggestion in all_suggestions[:10]:
print(f" {suggestion['file']}")
if 'function' in suggestion:
print(f" Function: {suggestion['function']}")
print(f" {suggestion['issue']}")
if 'details' in suggestion:
print(f" {suggestion['details']}")
if len(all_suggestions) > 10:
print(f"\n ... and {len(all_suggestions) - 10} more")
print()
if exit_code == 0:
print("✓ All tests follow pattern conventions")
return exit_code
if __name__ == '__main__':
sys.exit(main())