cimport cython
from cpython.mem cimport PyMem_Malloc, PyMem_Free
import re
cimport cedlib
class NeedsAlphabetMapping(Exception):
pass
def _map_ascii_string(s):
if isinstance(s, bytes):
return s
elif isinstance(s, str):
s_bytes = s.encode('utf-8')
if len(s_bytes) == len(s):
return s_bytes
raise NeedsAlphabetMapping()
def _map_to_bytes(query, target, additional_equalities):
cdef bytes query_bytes
cdef bytes target_bytes
try:
query_bytes = _map_ascii_string(query)
target_bytes = _map_ascii_string(target)
except NeedsAlphabetMapping:
query_vals = set(query)
target_vals = set(target)
input_mapping = {
c: chr(idx)
for idx, c in enumerate(query_vals.union(target_vals))
}
if len(input_mapping) > 256:
raise ValueError(
"query and target combined have more than 256 unique values, "
"this is not supported.")
map_seq = lambda seq: ''.join(input_mapping[x] for x in seq).encode('ascii')
query_bytes = map_seq(query)
target_bytes = map_seq(target)
if additional_equalities is not None:
additional_equalities = [
(input_mapping[a], input_mapping[b])
for a, b in additional_equalities
if a in input_mapping and b in input_mapping]
return query_bytes, target_bytes, additional_equalities
def align(query, target, mode="NW", task="distance", k=-1, additionalEqualities=None):
cdef bytes query_bytes
cdef bytes target_bytes
query_bytes, target_bytes, additionalEqualities = _map_to_bytes(
query, target, additionalEqualities)
cdef char* cquery = query_bytes;
cdef char* ctarget = target_bytes;
cconfig = cedlib.edlibDefaultAlignConfig()
if k is not None: cconfig.k = k
if mode == 'NW': cconfig.mode = cedlib.EDLIB_MODE_NW
if mode == 'HW': cconfig.mode = cedlib.EDLIB_MODE_HW
if mode == 'SHW': cconfig.mode = cedlib.EDLIB_MODE_SHW
if task == 'distance': cconfig.task = cedlib.EDLIB_TASK_DISTANCE
if task == 'locations': cconfig.task = cedlib.EDLIB_TASK_LOC
if task == 'path': cconfig.task = cedlib.EDLIB_TASK_PATH
cdef bytes tmp_bytes;
cdef char* tmp_cstring;
cdef cedlib.EdlibEqualityPair* c_additionalEqualities = NULL
if additionalEqualities is None:
cconfig.additionalEqualities = NULL
cconfig.additionalEqualitiesLength = 0
else:
c_additionalEqualities = <cedlib.EdlibEqualityPair*> PyMem_Malloc(len(additionalEqualities)
* cython.sizeof(cedlib.EdlibEqualityPair))
for i in range(len(additionalEqualities)):
c_additionalEqualities[i].first = bytearray(additionalEqualities[i][0].encode('utf-8'))[0]
c_additionalEqualities[i].second = bytearray(additionalEqualities[i][1].encode('utf-8'))[0]
cconfig.additionalEqualities = c_additionalEqualities
cconfig.additionalEqualitiesLength = len(additionalEqualities)
query_len = len(query)
target_len = len(target)
with nogil:
cresult = cedlib.edlibAlign(cquery, query_len, ctarget, target_len, cconfig)
if c_additionalEqualities != NULL: PyMem_Free(c_additionalEqualities)
if cresult.status == 1:
raise Exception("There was an error.")
locations = []
if cresult.numLocations >= 0:
for i in range(cresult.numLocations):
locations.append((cresult.startLocations[i] if cresult.startLocations else None,
cresult.endLocations[i] if cresult.endLocations else None))
cigar = None
if cresult.alignment:
ccigar = cedlib.edlibAlignmentToCigar(cresult.alignment, cresult.alignmentLength,
cedlib.EDLIB_CIGAR_EXTENDED)
cigar = <bytes> ccigar
cigar = cigar.decode('UTF-8')
result = {
'editDistance': cresult.editDistance,
'alphabetLength': cresult.alphabetLength,
'locations': locations,
'cigar': cigar
}
cedlib.edlibFreeAlignResult(cresult)
return result
def getNiceAlignment(alignResult, query, target, gapSymbol="-"):
if type(alignResult) is not dict:
raise Exception("The object alignResult is expected to be a python dictionary. Please check the input alignResult.")
if 'locations' not in alignResult.keys():
raise Exception("The object alignResult is expected to contain a field 'locations'. Please check the input alignResult.")
target_pos = alignResult["locations"][0][0]
if target_pos == None:
target_pos = 0
query_pos = 0 target_aln = match_aln = query_aln = ""
if 'cigar' not in alignResult.keys():
raise Exception("The object alignResult is expected to contain a CIGAR string. Please check the input alignResult.")
cigar = alignResult["cigar"]
if alignResult["cigar"] == '' or alignResult["cigar"] == None:
raise Exception("The object alignResult contains an empty CIGAR string. Users must run align() with task='path'. Please check the input alignResult.")
for num_occurrences, alignment_operation in re.findall("(\d+)(\D)", cigar):
num_occurrences = int(num_occurrences)
if alignment_operation == "=":
target_aln += target[target_pos : target_pos + num_occurrences]
target_pos += num_occurrences
query_aln += query[query_pos : query_pos + num_occurrences]
query_pos += num_occurrences
match_aln += "|" * num_occurrences
elif alignment_operation == "X":
target_aln += target[target_pos : target_pos + num_occurrences]
target_pos += num_occurrences
query_aln += query[query_pos : query_pos + num_occurrences]
query_pos += num_occurrences
match_aln += "." * num_occurrences
elif alignment_operation == "D":
target_aln += target[target_pos : target_pos + num_occurrences]
target_pos += num_occurrences
query_aln += gapSymbol * num_occurrences
query_pos += 0
match_aln +=gapSymbol * num_occurrences
elif alignment_operation == "I":
target_aln += gapSymbol * num_occurrences
target_pos += 0
query_aln += query[query_pos : query_pos + num_occurrences]
query_pos += num_occurrences
match_aln += gapSymbol * num_occurrences
else:
raise Exception("The CIGAR string from alignResult contains a symbol not '=', 'X', 'D', 'I'. Please check the validity of alignResult and alignResult.cigar")
alignments = {
'query_aligned': query_aln,
'matched_aligned': match_aln,
'target_aligned': target_aln
}
return alignments