import pandas as pd
import sys
def load_distance_matrix(filepath):
with open(filepath, 'r') as f:
lines = [line for line in f if not line.startswith('#')]
from io import StringIO
df = pd.read_csv(StringIO(''.join(lines)), sep='\t', index_col=0)
return df
def manual_sequence_analysis():
sequences = {
'locus1': {
'ref': 'ATCGATCGATCGATCG', 'snp1': 'ATCGATCGATCGATGG', 'del1': 'ATCGATCGATCGTCG', 'del3': 'ATCGATCGTCG', 'ins1': 'ATCGATCGATCGATCGA', 'ins3': 'ATCGATCGATCGATCGAAA', },
'locus2': {
'ref': 'GGGGGGGGGGGGGGGG', 'snp1': 'GGGGGGGGGTGGGGGG', 'del2': 'GGGGGGGGGGGGGG', 'del5': 'GGGGGGGGGGG', 'ins2': 'GGGGGGGGGGGGGGGGCC', },
'locus3': {
'ref': 'CCCCCCCCCCCCCCCC', 'snp3': 'CCCCCTTCCCCCCCCC', 'del1': 'CCCCCCCCCCCCCCC', 'ins4': 'CCCCCCCCCCCCCCCCTTTT', 'complex1': 'CCCAAACCCCCCCCC', 'large_del': 'CCCCCCCC', }
}
sample_profiles = {
'Sample_Ref': ('ref', 'ref', 'ref'),
'Sample_Identical': ('ref', 'ref', 'ref'), 'Sample_SNPs_Only': ('snp1', 'snp1', 'snp3'),
'Sample_Dels_Only': ('del1', 'del2', 'del1'),
'Sample_Ins_Only': ('ins1', 'ins2', 'ins4'),
'Sample_Mixed1': ('snp2', 'del5', 'complex1'), }
expected_distances = {}
expected_distances[('Sample_Ref', 'Sample_SNPs_Only')] = {
'hamming': 3,
'snps': 4,
'snps_indel_contiguous': 4,
'snps_indel_bases': 4
}
expected_distances[('Sample_Ref', 'Sample_Dels_Only')] = {
'hamming': 3,
'snps': 3, 'snps_indel_contiguous': 3, 'snps_indel_bases': 4 }
expected_distances[('Sample_Ref', 'Sample_Ins_Only')] = {
'hamming': 3,
'snps': 3, 'snps_indel_contiguous': 3, 'snps_indel_bases': 7 }
return expected_distances
def validate_cgdist():
matrices = {
'hamming': load_distance_matrix('results/crc32_hamming.tsv'),
'snps': load_distance_matrix('results/crc32_snps.tsv'),
'snps_indel_contiguous': load_distance_matrix('results/crc32_snps_indel_contiguous.tsv'),
'snps_indel_bases': load_distance_matrix('results/crc32_snps_indel_bases.tsv')
}
expected_distances = manual_sequence_analysis()
print("=" * 80)
print("CORRECTED cgDist VALIDATION RESULTS")
print("=" * 80)
all_passed = True
test_pair = ('Sample_Ref', 'Sample_Identical')
print(f"\n📊 Testing: {test_pair[0]} vs {test_pair[1]}")
print(" Expected: All distances = 0 (identical sequences)")
print("-" * 60)
for mode, matrix in matrices.items():
actual = int(matrix.loc[test_pair[0], test_pair[1]])
if actual == 0:
status = "✅ PASS"
else:
status = "❌ FAIL"
all_passed = False
print(f" {mode:20s}: Expected=0, Actual={actual} {status}")
for pair, expected in expected_distances.items():
print(f"\n📊 Testing: {pair[0]} vs {pair[1]}")
print("-" * 60)
test_passed = True
for mode, exp_val in expected.items():
actual = int(matrices[mode].loc[pair[0], pair[1]])
if actual == exp_val:
status = "✅ PASS"
else:
status = "❌ FAIL"
test_passed = False
all_passed = False
print(f" {mode:20s}: Expected={exp_val:3d}, Actual={actual:3d} {status}")
if test_passed:
print(" Overall: ✅ TEST PASSED")
else:
print(" Overall: ❌ TEST FAILED")
print("\n" + "=" * 80)
print("MATHEMATICAL INVARIANT CHECK (cgDist ≥ Hamming):")
print("=" * 80)
invariant_violations = 0
for sample1 in matrices['hamming'].index:
for sample2 in matrices['hamming'].columns:
if sample1 != sample2: hamming_dist = int(matrices['hamming'].loc[sample1, sample2])
snps_dist = int(matrices['snps'].loc[sample1, sample2])
if snps_dist < hamming_dist:
print(f"❌ VIOLATED: {sample1} vs {sample2}: SNPs({snps_dist}) < Hamming({hamming_dist})")
invariant_violations += 1
if invariant_violations == 0:
print("✅ Mathematical invariant maintained for all pairs")
else:
print(f"❌ Found {invariant_violations} invariant violations")
all_passed = False
print("\n" + "=" * 80)
print("DISTANCE MATRIX SUMMARY:")
print("=" * 80)
print("\\nDistance from Sample_Ref to all samples:")
for mode, matrix in matrices.items():
print(f"\\n{mode.upper()}:")
ref_distances = matrix.loc['Sample_Ref'].sort_values()
for sample, dist in ref_distances.items():
if sample != 'Sample_Ref':
print(f" {sample:18s}: {int(dist):3d}")
print("\n" + "=" * 80)
if all_passed:
print("🎉 ALL VALIDATION TESTS PASSED!")
print("✅ cgDist is correctly calculating SNPs, InDel events, and InDel bases")
print("✅ Mathematical invariants are maintained")
print("✅ Parasail alignment integration is working correctly")
else:
print("⚠️ SOME TESTS FAILED")
print("=" * 80)
return all_passed
if __name__ == '__main__':
success = validate_cgdist()
sys.exit(0 if success else 1)