import nestedtext as nt
try:
from nestedtext.nestedtext import Lines
except ImportError:
warn(
"could not import internal NestedText Lines class,",
"line types are not available."
)
Lines = None
from base64 import b64encode, b64decode
from collections import defaultdict
from docopt import docopt
from inform import (
Error, fatal, indent, is_str, is_mapping, is_collection, os_error,
terminate, warn
)
from itertools import batched
from pathlib import Path
from voluptuous import (
Schema, Optional, Required, Any, Self, Invalid, MultipleInvalid
)
from voluptuous_errors import report_voluptuous_errors
import codecs
import io
import re
import json
def as_string_with_escapes(arg):
if not is_str(arg):
raise Invalid("expected string.")
return arg.encode('ascii', errors='backslashreplace').decode('unicode-escape')
hierarchy_with_escapes = Schema(
Any(as_string_with_escapes, [Self], {as_string_with_escapes: Self})
)
def evaluate(arg):
if is_str(arg):
arg = arg.strip()
if arg == 'None':
return None
if arg[0:1].strip() == '!':
return eval(arg[1:])
raise Invalid("expected ‘None’ or string that starts with ‘!’.")
def as_index(arg):
if arg == 'None':
return None
return int(arg)
tests_validator = Schema({
str: dict(
description = str,
string_in = str,
bytes_in = str,
encoding = str,
load_out = Any(evaluate, hierarchy_with_escapes),
load_err = dict(
message = str,
line = as_string_with_escapes,
lineno = as_index,
colno = as_index,
),
)
})
nl_ptn = re.compile(rb'((?:\\n|\\r){1,2})\n')
def fix_eol(match):
stripped_of_lf = match.group(0).replace(b'\n', b'')
return stripped_of_lf.replace(rb'\r', b'\r').replace(rb'\n', b'\n')
if Lines:
def extract_line_types(content):
try:
content = b64decode(content).decode('utf-8-sig', errors='strict')
except UnicodeError as e:
return dict(unrecognized = 1)
types = defaultdict(int)
lines = Lines([], True)
lines.lines = io.StringIO(content, newline=None)
for line in lines.read_lines():
types[line.kind] += 1
return types
else:
def extract_line_types(text):
return {}
def utf8_encode(given, encoding):
bytes = given.encode('ascii', errors='backslashreplace') bytes = nl_ptn.sub(fix_eol, bytes) text = bytes.decode('unicode-escape') bytes = text.encode(encoding) return b64encode(bytes).decode('ascii')
def bytes_encode(given, encoding):
bytes = given.encode('ascii', errors='strict') bytes = nl_ptn.sub(fix_eol, bytes) bytes, _ = codecs.escape_decode(bytes)
return b64encode(bytes).decode('ascii')
cmdline = docopt(__doc__)
input_path = Path(cmdline['<tests.nt>'] or 'tests.nt')
output_path = input_path.with_suffix('.json')
try:
keymap = {}
tests = nt.load(input_path, keymap=keymap)
tests = tests_validator(tests)
except OSError as e:
fatal(os_error(e))
except nt.NestedTextError as e:
e.terminate()
except MultipleInvalid as e:
report_voluptuous_errors(e, keymap, source='tests.nt')
terminate()
processed = {}
accumulated_line_types = defaultdict(int)
try:
for key, fields in tests.items():
if 'string_in' not in fields and 'bytes_in' not in fields:
warn("‘string_in’ is missing.", culprit=key)
continue
if 'string_in' in fields and 'bytes_in' in fields:
warn("must not have both ‘string_in’ and ‘bytes_in’ fields.", culprit=key)
continue
encoding = fields.get('encoding', 'utf-8')
try:
if 'string_in' in fields:
load_in = fields.get('string_in')
load_in_encoded = utf8_encode(load_in, encoding)
else:
load_in = fields.get('bytes_in')
load_in_encoded = bytes_encode(load_in, encoding)
except (UnicodeEncodeError, UnicodeDecodeError) as e:
raise Error(e, culprit=key)
load_out = fields.get('load_out')
load_err = fields.get("load_err", {})
if load_out and load_err:
raise Error("must not specify both ‘load_out’ and ‘load_err’.")
processed_test = dict(
load_in = load_in_encoded,
load_out = load_out,
load_err = load_err,
encoding = encoding,
types = extract_line_types(load_in_encoded)
)
for line_type, count in processed_test['types'].items():
accumulated_line_types[line_type] += count
if 'unrecognized' in processed_test['types'] and not load_err:
warn("unrecognized line in a test that does not expect an error.",
culprit=key)
processed[key] = processed_test
content = json.dumps(dict(load_tests=processed), indent=4, ensure_ascii=False)
output_path.write_text(content + '\n', encoding="utf-8")
except OSError as e:
fatal(os_error(e))
except Error as e:
e.terminate(culprit=e.get_culprit(key))
if accumulated_line_types:
print("Count of line types found:")
print(indent(nt.dumps(accumulated_line_types, sort_keys=True)))
print()
print(f"Number of tests: {len(tests)}")