import json
import sys
import os
from pathlib import Path
from typing import Dict, List, Any, Tuple, Optional
import argparse
def load_debtmap_json(filepath: str) -> Dict[str, Any]:
try:
with open(filepath, 'r') as f:
data = json.load(f)
if 'items' not in data:
raise ValueError(f"Missing 'items' key in {filepath}")
return data
except FileNotFoundError:
raise FileNotFoundError(f"Debtmap file not found: {filepath}")
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in {filepath}: {e}")
def extract_metrics(data: Dict[str, Any]) -> Dict[str, Any]:
items = data.get('items', [])
critical_items = [] high_priority_items = [] medium_priority_items = [] low_priority_items = []
total_complexity = 0
total_coverage_gaps = 0
functions_with_gaps = 0
for item in items:
score = item.get('unified_score', {}).get('final_score', 0)
if score >= 8:
critical_items.append(item)
elif score >= 6:
high_priority_items.append(item)
elif score >= 4:
medium_priority_items.append(item)
else:
low_priority_items.append(item)
debt_type = item.get('debt_type', {})
if 'ComplexFunction' in debt_type:
total_complexity += debt_type['ComplexFunction'].get('cyclomatic', 0)
elif 'TestingGap' in debt_type:
gap = debt_type['TestingGap']
total_complexity += gap.get('cyclomatic', 0)
coverage = gap.get('coverage', 100)
if coverage < 50:
functions_with_gaps += 1
total_coverage_gaps += (100 - coverage)
avg_score = sum(item.get('unified_score', {}).get('final_score', 0) for item in items) / len(items) if items else 0
return {
'total_items': len(items),
'critical_items': len(critical_items),
'high_priority_items': len(high_priority_items),
'medium_priority_items': len(medium_priority_items),
'low_priority_items': len(low_priority_items),
'average_score': round(avg_score, 2),
'total_complexity': total_complexity,
'functions_with_gaps': functions_with_gaps,
'total_coverage_gaps': total_coverage_gaps,
'total_debt_score': data.get('total_debt_score', 0),
'overall_coverage': data.get('overall_coverage', 100)
}
def identify_improvements(before_items: List[Dict], after_items: List[Dict]) -> Dict[str, Any]:
def make_key(item):
loc = item.get('location', {})
return f"{loc.get('file')}:{loc.get('function')}:{loc.get('line')}"
before_map = {make_key(item): item for item in before_items}
after_map = {make_key(item): item for item in after_items}
resolved_items = []
improved_items = []
new_items = []
unchanged_critical = []
for key, before_item in before_map.items():
before_score = before_item.get('unified_score', {}).get('final_score', 0)
if key not in after_map:
resolved_items.append({
'location': key,
'score': before_score,
'description': before_item.get('recommendation', {}).get('primary_action', 'Fixed')
})
else:
after_item = after_map[key]
after_score = after_item.get('unified_score', {}).get('final_score', 0)
if after_score < before_score:
improved_items.append({
'location': key,
'before_score': before_score,
'after_score': after_score,
'improvement': before_score - after_score
})
elif before_score >= 8 and after_score >= 8:
unchanged_critical.append({
'location': key,
'score': after_score,
'item': after_item
})
for key, after_item in after_map.items():
if key not in before_map:
score = after_item.get('unified_score', {}).get('final_score', 0)
new_items.append({
'location': key,
'score': score,
'item': after_item
})
return {
'resolved': resolved_items,
'improved': improved_items,
'new': new_items,
'unchanged_critical': unchanged_critical
}
def calculate_improvement_score(
before_metrics: Dict[str, Any],
after_metrics: Dict[str, Any],
improvements: Dict[str, Any]
) -> Tuple[float, List[str], List[str]]:
improvement_notes = []
remaining_issues = []
scores = {}
resolved_critical = sum(1 for item in improvements['resolved'] if item['score'] >= 8)
resolved_high = sum(1 for item in improvements['resolved'] if item['score'] >= 6)
total_critical_before = before_metrics['critical_items']
if total_critical_before > 0:
critical_resolution_rate = resolved_critical / total_critical_before
scores['critical_resolution'] = critical_resolution_rate * 100
if resolved_critical > 0:
improvement_notes.append(f"Resolved {resolved_critical} critical debt items")
else:
scores['critical_resolution'] = 100
score_improvement = max(0, before_metrics['average_score'] - after_metrics['average_score'])
score_improvement_pct = (score_improvement / before_metrics['average_score'] * 100) if before_metrics['average_score'] > 0 else 0
scores['overall_improvement'] = min(100, score_improvement_pct * 2)
if score_improvement > 0:
improvement_notes.append(f"Reduced average debt score from {before_metrics['average_score']:.1f} to {after_metrics['average_score']:.1f}")
complexity_reduction = max(0, before_metrics['total_complexity'] - after_metrics['total_complexity'])
complexity_pct = (complexity_reduction / before_metrics['total_complexity'] * 100) if before_metrics['total_complexity'] > 0 else 0
scores['complexity_reduction'] = min(100, complexity_pct)
if complexity_reduction > 0:
improvement_notes.append(f"Reduced total complexity by {complexity_pct:.0f}%")
new_critical = sum(1 for item in improvements['new'] if item['score'] >= 8)
scores['no_regression'] = 100 if new_critical == 0 else max(0, 100 - (new_critical * 25))
if new_critical > 0:
remaining_issues.append(f"{new_critical} new critical debt items introduced")
if improvements['unchanged_critical']:
count = len(improvements['unchanged_critical'])
remaining_issues.append(f"{count} critical debt items still present")
improvement_score = (
scores['critical_resolution'] * 0.4 +
scores['overall_improvement'] * 0.3 +
scores['complexity_reduction'] * 0.2 +
scores['no_regression'] * 0.1
)
return improvement_score, improvement_notes, remaining_issues
def identify_gaps(improvements: Dict[str, Any], threshold: float = 75.0) -> Dict[str, Any]:
gaps = {}
for idx, item_info in enumerate(improvements['unchanged_critical'][:3]): item = item_info['item']
location = item.get('location', {})
gap_key = f"critical_debt_remaining_{idx + 1}"
gaps[gap_key] = {
'description': f"High-priority debt item still present: {item.get('recommendation', {}).get('primary_action', 'Unknown issue')}",
'location': f"{location.get('file')}:{location.get('function')}:{location.get('line')}",
'severity': 'critical',
'suggested_fix': item.get('recommendation', {}).get('primary_action', 'Apply functional patterns to reduce complexity'),
'original_score': item_info['score'],
'current_score': item_info['score']
}
new_critical = [item for item in improvements['new'] if item['score'] >= 8]
for idx, item_info in enumerate(new_critical[:2]): item = item_info['item']
location = item.get('location', {})
gap_key = f"regression_detected_{idx + 1}"
gaps[gap_key] = {
'description': "New complexity introduced during refactoring",
'location': f"{location.get('file')}:{location.get('function')}:{location.get('line')}",
'severity': 'high',
'suggested_fix': "Simplify the newly added code using functional patterns",
'original_score': None,
'current_score': item_info['score']
}
return gaps
def main():
parser = argparse.ArgumentParser(description='Validate debtmap improvements')
parser.add_argument('--before', required=True, help='Path to before debtmap JSON')
parser.add_argument('--after', required=True, help='Path to after debtmap JSON')
parser.add_argument('--output', default='.prodigy/debtmap-validation.json',
help='Output path for validation results')
is_automated = os.environ.get('PRODIGY_AUTOMATION') == 'true' or \
os.environ.get('PRODIGY_VALIDATION') == 'true'
try:
if len(sys.argv) > 1:
args = parser.parse_args()
else:
import shlex
args_str = os.environ.get('ARGUMENTS', '')
if not args_str:
args_str = '--before .prodigy/debtmap-before.json --after .prodigy/debtmap-after.json --output .prodigy/debtmap-validation.json'
args = parser.parse_args(shlex.split(args_str))
if not is_automated:
print(f"Loading debtmap data from {args.before} and {args.after}...")
before_data = load_debtmap_json(args.before)
after_data = load_debtmap_json(args.after)
before_metrics = extract_metrics(before_data)
after_metrics = extract_metrics(after_data)
improvements = identify_improvements(
before_data.get('items', []),
after_data.get('items', [])
)
score, improvement_notes, remaining_issues = calculate_improvement_score(
before_metrics, after_metrics, improvements
)
if score >= 75:
status = 'complete'
elif score >= 40:
status = 'incomplete'
else:
status = 'failed'
gaps = {}
if score < 75:
gaps = identify_gaps(improvements, score)
result = {
'completion_percentage': round(score, 1),
'status': status,
'improvements': improvement_notes,
'remaining_issues': remaining_issues,
'gaps': gaps,
'before_summary': {
'total_items': before_metrics['total_items'],
'high_priority_items': before_metrics['critical_items'],
'average_score': before_metrics['average_score']
},
'after_summary': {
'total_items': after_metrics['total_items'],
'high_priority_items': after_metrics['critical_items'],
'average_score': after_metrics['average_score']
}
}
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'w') as f:
json.dump(result, f, indent=2)
if not is_automated:
print(f"\nValidation complete: {score:.1f}% improvement")
print(f"Status: {status}")
print(f"Results written to: {output_path}")
if improvement_notes:
print("\nImprovements:")
for note in improvement_notes:
print(f" ✓ {note}")
if remaining_issues:
print("\nRemaining issues:")
for issue in remaining_issues:
print(f" • {issue}")
sys.exit(0 if status == 'complete' else 1)
except Exception as e:
error_result = {
'completion_percentage': 0.0,
'status': 'failed',
'improvements': [],
'remaining_issues': [f"Validation error: {str(e)}"],
'gaps': {},
'raw_output': str(e)
}
try:
output_path = args.output if 'args' in locals() else '.prodigy/debtmap-validation.json'
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'w') as f:
json.dump(error_result, f, indent=2)
except:
pass
if not is_automated:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == '__main__':
main()