from collections import namedtuple, deque
import difflib
import pygments.formatters
import pygments.lexers
import pygments.token
import re
from typing import List, Tuple, Optional, Iterator, Iterable
from literate.annot import Span, Annot, SpanMerger, \
cut_annot, merge_annot, sub_annot, fill_annot
from literate.file import File, Line, Diff, DiffBlock, Hunk, OutputLine
from literate.points import Point, cut_annot_at_points
RUN_RE = re.compile(r'([^ \n])\1*')
def parse_intra_annot(s: str) -> Annot[str]:
spans = []
for m in RUN_RE.finditer(s):
c = m.group(1)
label = {
'+': 'ins',
'-': 'del',
'^': 'chg',
}[c]
spans.append(Span(m.start(), m.end(), label))
return spans
DiffLine = Tuple[bool, bool, Optional[Annot[str]], Optional[Annot[str]]]
def diff_lines(old_lines: List[str], new_lines: List[str]) -> Iterator[DiffLine]:
buf = deque()
for dl in difflib.ndiff(old_lines, new_lines):
prefix = dl[0:2]
if prefix == ' ':
while buf:
yield buf.popleft()
yield (True, True, None, None)
elif prefix == '- ':
while buf:
yield buf.popleft()
buf.append((True, False, None, None))
elif prefix == '+ ':
if len(buf) > 0:
old_line, new_line, old_detail, new_detail = buf[-1]
if not new_line and old_detail is not None:
assert not new_line
buf[-1] = (old_line, True, old_detail, None)
continue
while len(buf) > 2:
yield buf.popleft()
buf.append((False, True, None, None))
elif prefix == '? ':
detail = parse_intra_annot(dl[2:])
assert len(buf) > 0
old_line, new_line, old_detail, new_detail = buf.pop()
if new_line:
if old_line:
assert old_detail is not None
buf.append((True, True, old_detail, detail))
else:
old_line2, new_line2, old_detail2, new_detail2 = buf.pop()
assert old_line2
assert not new_line2
assert old_detail2 is None
assert new_detail2 is None
buf.append((True, True, None, detail))
else:
buf.append((True, False, detail, None))
while buf:
yield buf.popleft()
def adjust_closing_brace(old_lines: List[str], new_lines: List[str],
diff: Iterable[DiffLine]) -> Iterator[DiffLine]:
mode = None
buf = []
buf_start = None
old_i = -1
new_i = -1
for dl in diff:
old_line, new_line, old_detail, new_detail = dl
if old_line and not new_line:
new_mode = 'del'
old_i += 1
elif not old_line and new_line:
new_mode = 'ins'
new_i += 1
else:
new_mode = None
old_i += 1
new_i += 1
if new_mode != mode:
if new_mode is None:
check_lines = new_lines if mode == 'ins' else old_lines
i = new_i if mode == 'ins' else old_i
if check_lines[i].strip() == '}':
found_dl = None
for j, buf_dl in enumerate(buf):
if check_lines[buf_start + j] == check_lines[i]:
found_dl = buf_dl
yield (True, True, None, None)
yield from buf[j + 1:]
break
else:
yield buf_dl
if found_dl:
yield found_dl
else:
yield (True, True, None, None)
else:
yield from buf
yield dl
mode = None
buf = []
buf_start = None
continue
else:
if mode is not None:
yield from buf
mode = new_mode
buf = []
buf_start = new_i if mode == 'ins' else old_i
if mode is None:
yield dl
else:
buf.append(dl)
yield from buf
WORD_BREAK_RE = re.compile(r'\b')
def token_annot(line: Line) -> Annot[None]:
annot = fill_annot(line.highlight, len(line.text))
extra_cuts = []
for span in annot:
if span.label == pygments.token.String or \
span.label in pygments.token.Comment:
text = line.text[span.start : span.end]
for m in WORD_BREAK_RE.finditer(text):
extra_cuts.append(Point(span.start + m.start()))
return cut_annot_at_points(annot, extra_cuts)
def calc_tokenized_intra(l1: Line, l2: Line) -> Tuple[Annot[str], Annot[str]]:
annot1 = token_annot(l1)
annot2 = token_annot(l2)
tokens1 = [l1.text[s.start : s.end] for s in annot1]
tokens2 = [l2.text[s.start : s.end] for s in annot2]
intra1 = []
intra2 = []
sm = difflib.SequenceMatcher(a=tokens1, b=tokens2)
for tag, i1, i2, j1, j2 in sm.get_opcodes():
if tag == 'equal':
continue
while i1 < i2 and tokens1[i1].isspace():
i1 += 1
while i2 > i1 and tokens1[i2 - 1].isspace():
i2 -= 1
while j1 < j2 and tokens2[j1].isspace():
j1 += 1
while j2 > j1 and tokens2[j2 - 1].isspace():
j2 -= 1
if i1 != i2:
intra1.append(Span(annot1[i1].start, annot1[i2 - 1].end,
'chg' if tag == 'replace' else 'del'))
if j1 != j2:
intra2.append(Span(annot2[j1].start, annot2[j2 - 1].end,
'chg' if tag == 'replace' else 'ins'))
return (intra1, intra2)
def diff_files(f1: File, f2: File) -> Diff:
dls = diff_lines(f1.line_text, f2.line_text)
dls = adjust_closing_brace(f1.line_text, f2.line_text, dls)
diff_blocks = []
old_start = 0
old_cur = 0
new_start = 0
new_cur = 0
changed = True
def flush():
nonlocal old_start, new_start
if old_cur - old_start > 0 or new_cur - new_start > 0:
diff_blocks.append(DiffBlock(changed,
Span(old_start, old_cur),
Span(new_start, new_cur)))
old_start = old_cur
new_start = new_cur
for old_line, new_line, old_detail, new_detail in dls:
next_changed = not (old_line and new_line and
old_detail is None and new_detail is None)
has_intra = old_detail is not None or new_detail is not None
if next_changed != changed:
flush()
if has_intra:
flush()
intra1, intra2 = calc_tokenized_intra(
f1.lines[old_cur], f2.lines[new_cur])
if len(intra1) > 0:
f1.lines[old_cur].set_intra(intra1)
if len(intra2) > 0:
f2.lines[new_cur].set_intra(intra2)
flush()
if old_line:
old_cur += 1
if new_line:
new_cur += 1
changed = next_changed
flush()
return Diff(f1, f2, diff_blocks)
def context_annot(blocks: List[DiffBlock], new: bool, context_lines: int) -> Annot[None]:
result = SpanMerger()
for (changed, old_span, new_span) in blocks:
if not changed:
continue
span = new_span if new else old_span
result.add(Span(
span.start - context_lines,
span.end + context_lines))
return result.finish()
def split_hunks(blocks: List[DiffBlock]) -> List[Hunk]:
last_old = 0
last_new = 0
cur = []
hunks = []
def flush():
nonlocal cur
if len(cur) > 0:
hunks.append(Hunk(cur))
cur = []
for b in blocks:
changed, old_span, new_span = b
if old_span.start != last_old or new_span.start != last_new:
flush()
cur.append(b)
last_old = old_span.end
last_new = new_span.end
flush()
return hunks
def annotate_blocks(blocks: List[DiffBlock]) \
-> Tuple[Annot[Span[None]], Annot[Span[None]]]:
old = []
new = []
for b in blocks:
old.append(Span(b.old_span.start, b.old_span.end, b))
new.append(Span(b.new_span.start, b.new_span.end, b))
return old, new
def build_diff_hunks(d: Diff, context_diff: bool=True):
def calc_file_keep(f, is_new):
if context_diff:
keep = context_annot(d.blocks, is_new, 5)
if f.keep_mark_lines is not None:
keep = merge_annot(keep, f.keep_mark_lines)
else:
if len(f.line_annot) > 0:
keep = [Span(0, f.line_annot[-1].end)]
else:
keep = []
if f.drop_irrelevant_lines is not None:
keep = sub_annot(keep, f.drop_irrelevant_lines)
return keep
keep_old = calc_file_keep(d.old_file, False)
keep_new = calc_file_keep(d.new_file, True)
old_blocks, new_blocks = annotate_blocks(d.blocks)
extra_keep_old = []
extra_keep_new = []
for block_span, keep_spans in cut_annot(keep_old, old_blocks):
if block_span.label.changed:
continue
base = block_span.label.new_span.start
extra_keep_new.extend(s + base for s in keep_spans)
for block_span, keep_spans in cut_annot(keep_new, new_blocks):
if block_span.label.changed:
continue
base = block_span.label.old_span.start
extra_keep_old.extend(s + base for s in keep_spans)
keep_old = merge_annot(keep_old, extra_keep_old)
keep_new = merge_annot(keep_new, extra_keep_new)
blocks = []
for (old_block, old_keeps), (new_block, new_keeps) in zip(
cut_annot(keep_old, old_blocks),
cut_annot(keep_new, new_blocks)):
assert old_block.label is new_block.label
block = old_block.label
for old_keep, new_keep in zip(old_keeps, new_keeps):
blocks.append(DiffBlock(block.changed,
old_keep + block.old_span.start,
new_keep + block.new_span.start))
for old_keep in old_keeps[len(new_keeps):]:
blocks.append(DiffBlock(block.changed,
old_keep + block.old_span.start,
Span(block.new_span.end, block.new_span.end)))
for new_keep in new_keeps[len(old_keeps):]:
blocks.append(DiffBlock(block.changed,
Span(block.old_span.end, block.old_span.end),
new_keep + block.new_span.start))
hunks = split_hunks(blocks)
d.set_hunks(hunks)
def hunk_output_lines(h: Hunk) -> List[OutputLine]:
result = []
for changed, old_span, new_span in h.blocks:
common_lines = min(len(old_span), len(new_span))
for i in range(0, common_lines):
result.append(OutputLine(changed, old_span.start + i, new_span.start + i))
for i in range(common_lines, len(old_span)):
result.append(OutputLine(changed, old_span.start + i, None))
for i in range(common_lines, len(new_span)):
result.append(OutputLine(changed, None, new_span.start + i))
return result
def build_output_lines(d: Diff):
for h in d.hunks:
output_lines = hunk_output_lines(h)
h.set_output_lines(output_lines)