BEFORE_FILE=""
AFTER_FILE=""
OUTPUT_FILE=".prodigy/debtmap-validation.json"
if [ -n "$ARGUMENTS" ]; then
ARGS_ARRAY=($ARGUMENTS)
else
ARGS_ARRAY=("$@")
fi
i=0
while [ $i -lt ${#ARGS_ARRAY[@]} ]; do
case ${ARGS_ARRAY[$i]} in
--before)
BEFORE_FILE="${ARGS_ARRAY[$((i+1))]}"
i=$((i+2))
;;
--after)
AFTER_FILE="${ARGS_ARRAY[$((i+1))]}"
i=$((i+2))
;;
--output)
OUTPUT_FILE="${ARGS_ARRAY[$((i+1))]}"
i=$((i+2))
;;
*)
i=$((i+1))
;;
esac
done
if [ -z "$BEFORE_FILE" ] || [ -z "$AFTER_FILE" ]; then
echo "Error: Missing required parameters"
echo "Usage: --before <before-json> --after <after-json> [--output <output-file>]"
mkdir -p "$(dirname "$OUTPUT_FILE")"
echo '{"completion_percentage": 0.0, "status": "failed", "improvements": [], "remaining_issues": ["Missing required parameters"], "gaps": {}}' > "$OUTPUT_FILE"
exit 1
fi
if [ ! -f "$BEFORE_FILE" ]; then
echo "Error: Before file not found: $BEFORE_FILE"
mkdir -p "$(dirname "$OUTPUT_FILE")"
echo '{"completion_percentage": 0.0, "status": "failed", "improvements": [], "remaining_issues": ["Before file not found"], "gaps": {}}' > "$OUTPUT_FILE"
exit 1
fi
if [ ! -f "$AFTER_FILE" ]; then
echo "Error: After file not found: $AFTER_FILE"
mkdir -p "$(dirname "$OUTPUT_FILE")"
echo '{"completion_percentage": 0.0, "status": "failed", "improvements": [], "remaining_issues": ["After file not found"], "gaps": {}}' > "$OUTPUT_FILE"
exit 1
fi
IS_AUTOMATION=false
if [ "$PRODIGY_AUTOMATION" = "true" ] || [ "$PRODIGY_VALIDATION" = "true" ]; then
IS_AUTOMATION=true
fi
if [ "$IS_AUTOMATION" = "false" ]; then
echo "Validating debtmap improvement..."
echo " Before: $BEFORE_FILE"
echo " After: $AFTER_FILE"
echo " Output: $OUTPUT_FILE"
fi
mkdir -p "$(dirname "$OUTPUT_FILE")"
python3 - "$BEFORE_FILE" "$AFTER_FILE" "$OUTPUT_FILE" << 'PYTHON_SCRIPT'
import json
import sys
import os
from typing import Dict, List, Any, Optional, Tuple
from pathlib import Path
def load_json(filepath: str) -> Optional[Dict]:
"""Load and parse a JSON file."""
try:
with open(filepath, 'r') as f:
return json.load(f)
except Exception as e:
print(f"Error loading {filepath}: {e}")
return None
def get_debt_items(data: Dict) -> List[Dict]:
"""Extract debt items from debtmap output."""
# Handle the standard debtmap output structure
if 'technical_debt' in data and 'debt_items' in data['technical_debt']:
return data['technical_debt']['debt_items']
# Alternative structures
elif 'debt_items' in data:
return data['debt_items']
elif 'technical_debt' in data and 'items' in data['technical_debt']:
return data['technical_debt']['items']
elif 'analysis' in data and 'debt_items' in data['analysis']:
return data['analysis']['debt_items']
return []
def get_overall_metrics(data: Dict) -> Dict:
"""Extract overall metrics from debtmap output."""
metrics = {
'total_items': 0,
'high_priority_items': 0,
'critical_items': 0,
'average_score': 0.0,
'total_score': 0.0,
'max_complexity': 0,
'average_complexity': 0.0
}
debt_items = get_debt_items(data)
if not debt_items:
return metrics
metrics['total_items'] = len(debt_items)
scores = []
complexities = []
for item in debt_items:
score = item.get('score', item.get('severity_score', 0))
if isinstance(score, str):
try:
score = float(score)
except:
score = 0
scores.append(score)
# Count priorities
if score >= 8:
metrics['critical_items'] += 1
metrics['high_priority_items'] += 1
elif score >= 6:
metrics['high_priority_items'] += 1
# Extract complexity if available
complexity = item.get('complexity', item.get('cyclomatic_complexity', 0))
if complexity:
complexities.append(complexity)
if scores:
metrics['average_score'] = sum(scores) / len(scores)
metrics['total_score'] = sum(scores)
if complexities:
metrics['average_complexity'] = sum(complexities) / len(complexities)
metrics['max_complexity'] = max(complexities)
return metrics
def compare_debt_items(before_items: List[Dict], after_items: List[Dict]) -> Dict:
"""Compare debt items to identify improvements and regressions."""
def item_key(item: Dict) -> str:
"""Generate a unique key for a debt item."""
# Handle standard debtmap output structure
location = item.get('file_path', item.get('location', item.get('file', '')))
function = item.get('function_name', item.get('function', ''))
issue_type = item.get('issue_type', '')
return f"{location}:{function}:{issue_type}"
before_map = {item_key(item): item for item in before_items}
after_map = {item_key(item): item for item in after_items}
resolved_items = []
improved_items = []
unchanged_critical = []
new_items = []
worsened_items = []
# Check for resolved and improved items
for key, before_item in before_map.items():
if key not in after_map:
# Item was resolved
resolved_items.append(before_item)
else:
after_item = after_map[key]
before_score = before_item.get('score', before_item.get('severity_score', 0))
after_score = after_item.get('score', after_item.get('severity_score', 0))
if isinstance(before_score, str):
before_score = float(before_score) if before_score else 0
if isinstance(after_score, str):
after_score = float(after_score) if after_score else 0
if after_score < before_score:
improved_items.append({
'item': after_item,
'before_score': before_score,
'after_score': after_score
})
elif after_score > before_score:
worsened_items.append({
'item': after_item,
'before_score': before_score,
'after_score': after_score
})
elif before_score >= 8:
# Critical item unchanged
unchanged_critical.append(before_item)
# Check for new items
for key, after_item in after_map.items():
if key not in before_map:
new_items.append(after_item)
return {
'resolved': resolved_items,
'improved': improved_items,
'unchanged_critical': unchanged_critical,
'new': new_items,
'worsened': worsened_items
}
def calculate_improvement_score(before_metrics: Dict, after_metrics: Dict, comparison: Dict) -> Tuple[float, List[str], List[str], Dict]:
"""Calculate improvement score and identify gaps."""
improvements = []
remaining_issues = []
gaps = {}
# Weight factors for scoring
weights = {
'resolved_critical': 0.4,
'overall_improvement': 0.3,
'complexity_reduction': 0.2,
'no_regression': 0.1
}
scores = {}
# 1. Resolved critical items
total_critical_before = before_metrics['critical_items']
resolved_critical = len([item for item in comparison['resolved']
if item.get('score', item.get('severity_score', 0)) >= 8])
if total_critical_before > 0:
scores['resolved_critical'] = (resolved_critical / total_critical_before) * 100
if resolved_critical > 0:
improvements.append(f"Resolved {resolved_critical} critical debt items")
else:
scores['resolved_critical'] = 100 if after_metrics['critical_items'] == 0 else 0
# 2. Overall score improvement
if before_metrics['average_score'] > 0:
score_reduction = (before_metrics['average_score'] - after_metrics['average_score']) / before_metrics['average_score']
scores['overall_improvement'] = max(0, score_reduction * 100)
if score_reduction > 0.1:
improvements.append(f"Reduced average debt score by {score_reduction*100:.1f}%")
else:
scores['overall_improvement'] = 100 if after_metrics['average_score'] == 0 else 0
# 3. Complexity reduction
if before_metrics['average_complexity'] > 0:
complexity_reduction = (before_metrics['average_complexity'] - after_metrics['average_complexity']) / before_metrics['average_complexity']
scores['complexity_reduction'] = max(0, complexity_reduction * 100)
if complexity_reduction > 0.1:
improvements.append(f"Reduced average complexity by {complexity_reduction*100:.1f}%")
else:
scores['complexity_reduction'] = 100 if after_metrics['average_complexity'] == 0 else 50
# 4. No regression penalty
new_critical = len([item for item in comparison['new']
if item.get('score', item.get('severity_score', 0)) >= 8])
worsened_count = len(comparison['worsened'])
if new_critical > 0 or worsened_count > 0:
scores['no_regression'] = 0
if new_critical > 0:
remaining_issues.append(f"{new_critical} new critical debt items introduced")
if worsened_count > 0:
remaining_issues.append(f"{worsened_count} debt items worsened")
else:
scores['no_regression'] = 100
# Calculate weighted average
total_score = sum(scores[key] * weights[key] for key in weights)
# Document improvements
if len(comparison['resolved']) > 0:
improvements.append(f"Resolved {len(comparison['resolved'])} debt items")
if len(comparison['improved']) > 0:
improvements.append(f"Improved {len(comparison['improved'])} debt items")
# Document remaining issues and gaps
for item in comparison['unchanged_critical']:
# Handle standard debtmap output structure
location = item.get('file_path', item.get('location', item.get('file', 'unknown')))
function = item.get('function_name', item.get('function', 'unknown'))
line_number = item.get('line_number', item.get('line', ''))
score = item.get('score', item.get('severity_score', 0))
issue_type = item.get('issue_type', 'Unknown issue')
gap_key = f"critical_debt_{location.replace('/', '_').replace('.', '_')}_{function}"
gaps[gap_key] = {
'description': f"Critical debt item: {issue_type}",
'location': f"{location}:{function}:{line_number}" if line_number else f"{location}:{function}",
'severity': 'critical',
'suggested_fix': item.get('recommendation', 'Apply functional programming patterns to reduce complexity'),
'original_score': score,
'current_score': score
}
remaining_issues.append(f"Critical debt in {location}:{function}")
# Check for insufficient improvement
if after_metrics['high_priority_items'] > 0:
remaining_issues.append(f"{after_metrics['high_priority_items']} high-priority items remain")
return total_score, improvements, remaining_issues, gaps
def main():
# Get file paths from command-line arguments
if len(sys.argv) < 4:
print("Error: Missing file paths")
sys.exit(1)
before_file = sys.argv[1]
after_file = sys.argv[2]
output_file = sys.argv[3]
# Load JSON files
before_data = load_json(before_file)
after_data = load_json(after_file)
if not before_data or not after_data:
result = {
'completion_percentage': 0.0,
'status': 'failed',
'improvements': [],
'remaining_issues': ['Failed to load debtmap JSON files'],
'gaps': {}
}
with open(output_file, 'w') as f:
json.dump(result, f, indent=2)
return
# Extract metrics
before_metrics = get_overall_metrics(before_data)
after_metrics = get_overall_metrics(after_data)
# Get debt items for comparison
before_items = get_debt_items(before_data)
after_items = get_debt_items(after_data)
# Compare items
comparison = compare_debt_items(before_items, after_items)
# Calculate improvement score
improvement_score, improvements, remaining_issues, gaps = calculate_improvement_score(
before_metrics, after_metrics, comparison
)
# Determine status
if improvement_score >= 75:
status = 'complete'
elif improvement_score >= 40:
status = 'incomplete'
else:
status = 'insufficient'
# Build result
result = {
'completion_percentage': round(improvement_score, 1),
'status': status,
'improvements': improvements,
'remaining_issues': remaining_issues,
'gaps': gaps,
'before_summary': {
'total_items': before_metrics['total_items'],
'high_priority_items': before_metrics['high_priority_items'],
'critical_items': before_metrics['critical_items'],
'average_score': round(before_metrics['average_score'], 2),
'average_complexity': round(before_metrics['average_complexity'], 2)
},
'after_summary': {
'total_items': after_metrics['total_items'],
'high_priority_items': after_metrics['high_priority_items'],
'critical_items': after_metrics['critical_items'],
'average_score': round(after_metrics['average_score'], 2),
'average_complexity': round(after_metrics['average_complexity'], 2)
}
}
# Write result to output file
Path(output_file).parent.mkdir(parents=True, exist_ok=True)
with open(output_file, 'w') as f:
json.dump(result, f, indent=2)
# Print summary (not JSON) only in non-automation mode
is_automation = os.environ.get('PRODIGY_AUTOMATION') == 'true' or \
os.environ.get('PRODIGY_VALIDATION') == 'true'
if not is_automation:
print(f"\nValidation complete: {improvement_score:.1f}% improvement")
print(f"Status: {status}")
if improvements:
print("\nImprovements made:")
for imp in improvements[:3]:
print(f" ✓ {imp}")
if remaining_issues and improvement_score < 75:
print("\nRemaining issues:")
for issue in remaining_issues[:3]:
print(f" ✗ {issue}")
print(f"\nValidation result written to: {output_file}")
if __name__ == '__main__':
main()
PYTHON_SCRIPT
exit 0