import sys
import re
import enum
import argparse
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle
from scipy.constants.constants import alpha
class Distance(enum.Enum):
edit = 1
gap_linear = 2
gap_affine = 3
gap_affine_2p = 4
class Penalties:
def __init__(self):
self.distance = Distance.edit
self.match = 0
self.mismatch = 0
self.indel = 0
self.gap_open1 = 0
self.gap_extend1 = 0
self.gap_open2 = 0
self.gap_extend2 = 0
def cigar_compute_score_edit(cigar_vector,penalties,ignore_misms):
score = 0
for op in cigar_vector:
if op[1] in "DI": score += int(op[0])
if op[1] in "X" and not ignore_misms: score += int(op[0])
return score
def cigar_compute_score_linear(cigar_vector,penalties,ignore_misms):
score = 0
for op in cigar_vector:
if op[1] == "M": score -= int(op[0]) * penalties.match
if op[1] == "X" and not ignore_misms: score -= int(op[0]) * penalties.mismatch
if op[1] in "DI": score -= int(op[0]) * penalties.indel
return score
def cigar_compute_score_affine(cigar_vector,penalties,ignore_misms):
score = 0
for op in cigar_vector:
if op[1] == "M": score -= int(op[0]) * penalties.match
if op[1] == "X" and not ignore_misms: score -= int(op[0]) * penalties.mismatch
if op[1] in "DI": score -= penalties.gap_open1 + int(op[0]) * penalties.gap_extend1
return score
def cigar_compute_score_affine_2p(cigar_vector,penalties,ignore_misms):
score = 0
for op in cigar_vector:
if op[1] == "M": score -= int(op[0]) * penalties.match
if op[1] == "X" and not ignore_misms: score -= int(op[0]) * penalties.mismatch
if op[1] in "DI":
score1 = penalties.gap_open1 + int(op[0]) * penalties.gap_extend1
score2 = penalties.gap_open2 + int(op[0]) * penalties.gap_extend2
score -= min(score1,score2)
return score
def cigar_compute_score(cigar,penalties,ignore_misms):
cigar_vector = re.findall(r'(\d+)([MXDI])',cigar)
if penalties.distance == Distance.edit:
return cigar_compute_score_edit(cigar_vector,penalties,ignore_misms)
elif penalties.distance == Distance.gap_linear:
return cigar_compute_score_linear(cigar_vector,penalties,ignore_misms)
elif penalties.distance == Distance.gap_affine:
return cigar_compute_score_affine(cigar_vector,penalties,ignore_misms)
elif penalties.distance == Distance.gap_affine_2p:
return cigar_compute_score_affine_2p(cigar_vector,penalties,ignore_misms)
class AlignmentStats:
def __init__(self):
self.score_same = 0
self.score_best1 = 0
self.score_best2 = 0
self.scores1 = list()
self.scores2 = list()
def __str__(self):
total_alignments = float(self.score_same + self.score_best1 + self.score_best2)
str = "[WFACompareAlignments::Stats]\n"
str += " => Alignments.common %d (%2.1f %%)\n" % (self.score_same,100.0*float(self.score_same)/total_alignments)
str += " => Alignments.best1 %d (%2.1f %%)\n" % (self.score_best1,100.0*float(self.score_best1)/total_alignments)
str += " => Alignments.best2 %d (%2.1f %%)" % (self.score_best2,100.0*float(self.score_best2)/total_alignments)
return str
def __repr__(self):
return __str__(self)
def plot_score_distribution(stats,input_path1,input_path2):
matplotlib.use('Agg')
fig,ax1 = plt.subplots()
ax1.set_xlabel('Score')
ax1.set_ylabel('Total Count')
range_min = min(min(stats.scores1),min(stats.scores2))
range_max = max(max(stats.scores1),max(stats.scores2))
n, bins, patches = ax1.hist(stats.scores1,50,range=[range_min,range_max],color="royalblue",edgecolor='black',alpha=0.5)
n, bins, patches = ax1.hist(stats.scores2,50,range=[range_min,range_max],color="darkorange",edgecolor='black',alpha=0.5)
start, end = ax1.get_xlim()
ax1.set_xticks(np.arange(start,end,(end-start)/5))
handles = [Rectangle((0,0),1,1,color=c,ec="k") for c in ["royalblue","darkorange"]]
labels= [input_path1,input_path2]
plt.legend(handles,labels,loc="upper left")
plt.title("Score distribution for '%s' and '%s'" % (input_path1,input_path2))
fig.savefig("wfaCmpAlg." + input_path1 + "." + input_path2 + ".png",
format='png',dpi=100,bbox_inches='tight')
def compare_alignments(input_path1,input_path2,penalties,use_score,ignore_misms,verbose):
stats = AlignmentStats()
input_file1 = open(input_path1,"rt")
input_file2 = open(input_path2,"rt")
line_no = 0
while True:
line_no += 1
line1 = input_file1.readline()
if not line1: break line2 = input_file2.readline()
if not line2:
print("[WFACompareAlignments] Files unsynch")
exit(-1)
fields1 = line1.split()
fields2 = line2.split()
if use_score:
cigar1 = None
cigar2 = None
score1 = int(fields1[0]) if len(fields1)<=2 else int(fields1[2])
score2 = int(fields2[0]) if len(fields2)<=2 else int(fields2[2])
else:
cigar1 = fields1[1] if len(fields1)<=2 else fields1[5]
cigar2 = fields2[1] if len(fields2)<=2 else fields2[5]
score1 = cigar_compute_score(cigar1,penalties,ignore_misms)
score2 = cigar_compute_score(cigar2,penalties,ignore_misms)
stats.scores1.append(score1)
stats.scores2.append(score2)
if score1 == score2:
stats.score_same += 1
elif score1 > score2:
stats.score_best1 += 1
else:
stats.score_best2 += 1
if verbose and score1 != score2:
print(">Failed::%d" % line_no)
print(" s1=%d\t%s" % (score1,cigar1))
print(" s2=%d\t%s" % (score2,cigar2))
input_file1.close()
input_file2.close()
return stats
parser = argparse.ArgumentParser()
parser.add_argument('-i1','--input1',required=True,help='Input file1 (*.alg)')
parser.add_argument('-i2','--input2',required=True,help='Input file2 (*.alg)')
parser.add_argument('-e','--edit',
help='Score alignments using edit distance (--edit)')
parser.add_argument('-l','--gap-linear',
help='Score alignments using gap-linear distance (--gap-linear=M,X,I)')
parser.add_argument('-g','--gap-affine',
help='Score alignments using gap-affine distance (--gap-affine=M,X,O,E)')
parser.add_argument('-G','--gap-affine-2p',
help='Score alignments using gap-affine-2p distance (--gap-affine-2p=M,X,O1,E1,O2,E2)')
parser.add_argument('--use-score', action='store_true',default=False,
help='Compares the score provided in the file (default=use-cigar)')
parser.add_argument('--ignore-misms', action='store_true',default=False,
help='Ignores mismatches when computing the score (default=disable)')
parser.add_argument('-p','--plot', action='store_true',default=False,
help='Plots score distribution of both inputs (default=disable)')
parser.add_argument('-v','--verbose', action='store_true',default=False,
help='Display comparison differences (default=disable)')
parser.add_argument('-H',action='store_true',dest="human_readable",default=False)
args = parser.parse_args()
penalties = Penalties()
if args.edit is not None:
pass elif args.gap_linear is not None:
values = args.gap_linear.split(',')
penalties.distance = Distance.gap_linear
penalties.match = int(values[0])
penalties.mismatch = int(values[1])
penalties.indel = int(values[2])
elif args.gap_affine is not None:
values = args.gap_affine.split(',')
penalties.distance = Distance.gap_affine
penalties.match = int(values[0])
penalties.mismatch = int(values[1])
penalties.gap_open1 = int(values[2])
penalties.gap_extend1 = int(values[3])
elif args.gap_affine_2p is not None:
values = args.gap_affine_2p.split(',')
penalties.distance = Distance.gap_affine_2p
penalties.match = int(values[0])
penalties.mismatch = int(values[1])
penalties.gap_open1 = int(values[2])
penalties.gap_extend1 = int(values[3])
penalties.gap_open2 = int(values[4])
penalties.gap_extend2 = int(values[5])
else:
print("[WFACompareAlignments] No distance provided. Using edit-distance (default)")
if penalties.match > 0:
print("[WFACompareAlignments] Match penalty must be negative or zero")
exit(-1)
if penalties.mismatch < 0 or \
penalties.mismatch < 0 or \
penalties.gap_open1 < 0 or \
penalties.gap_extend1 < 0 or \
penalties.gap_open2 < 0 or \
penalties.gap_extend2 < 0:
print("[WFACompareAlignments] Penalties must be positive or zero")
exit(-1)
stats = compare_alignments(
args.input1,args.input2,penalties,
args.use_score,args.ignore_misms,args.verbose)
print(stats)
if args.plot:
plot_score_distribution(stats,args.input1,args.input2)