nested-text 0.1.0

A fully spec-compliant NestedText v3.8 parser and serializer
Documentation
#!/usr/bin/env python3
"""
Convert

usage:
    convert [<tests.nt>]

Converts the human-friendly test definitions file (default is tests.nt) to the 
machine-friendly tests file (default is tests.json).  Running this command is 
only necessary if you change the test definitions file.
"""


# IMPORTS {{{1
import nestedtext as nt
try:
    from nestedtext.nestedtext import Lines
except ImportError:
    # To include line types in the generated json file we need a direct link to
    # NestedText Python implementation.  Create a symbolic link from this 
    # directory to the source directory:
    #     ln -s .../nestedtext/nestedtext/nestedtext.py .
    warn(
        "could not import internal NestedText Lines class,",
        "line types are not available."
    )
    Lines = None
from base64 import b64encode, b64decode
from collections import defaultdict
from docopt import docopt
from inform import (
    Error, fatal, indent, is_str, is_mapping, is_collection, os_error, 
    terminate, warn
)
from itertools import batched
from pathlib import Path
from voluptuous import (
    Schema, Optional, Required, Any, Self, Invalid, MultipleInvalid
)
from voluptuous_errors import report_voluptuous_errors
import codecs
import io
import re
import json


# SCHEMA {{{1
def as_string_with_escapes(arg):
    if not is_str(arg):
        raise Invalid("expected string.")
    return arg.encode('ascii', errors='backslashreplace').decode('unicode-escape')

hierarchy_with_escapes = Schema(
    Any(as_string_with_escapes, [Self], {as_string_with_escapes: Self})
)

def evaluate(arg):
    # use Python to evaluate argument if it begins with !
    if is_str(arg):
        arg = arg.strip()
        if arg == 'None':
            return None
        if arg[0:1].strip() == '!':
            return eval(arg[1:])
    raise Invalid("expected ‘None’ or string that starts with ‘!’.")

def as_index(arg):
    if arg == 'None':
        return None
    return int(arg)

tests_validator = Schema({
    str: dict(
        description = str,
        string_in = str,
        bytes_in = str,
        encoding = str,
        load_out = Any(evaluate, hierarchy_with_escapes),
        load_err = dict(
            message = str,
            line = as_string_with_escapes,
            lineno = as_index,
            colno = as_index,
        ),
    )
})

# UTILITIES {{{1
# process backslash escapes {{{2
nl_ptn = re.compile(rb'((?:\\n|\\r){1,2})\n')

def fix_eol(match):
    # remove implicitly added LF
    stripped_of_lf = match.group(0).replace(b'\n', b'')

    # replace explicitly specified \r and \n with CR and LF
    return stripped_of_lf.replace(rb'\r', b'\r').replace(rb'\n', b'\n')

# extract line types {{{1
if Lines:
    def extract_line_types(content):
        try:
            content = b64decode(content).decode('utf-8-sig', errors='strict')
        except UnicodeError as e:
            return dict(unrecognized = 1)

        types = defaultdict(int)
        lines = Lines([], True)
        lines.lines = io.StringIO(content, newline=None)

        for line in lines.read_lines():
            types[line.kind] += 1

        return types
else:
    def extract_line_types(text):
        return {}

# encode() {{{2
def utf8_encode(given, encoding):
    # interpret escape sequences, convert to base64

    # oddly, unicode-escape cannot handle unicode, so convert to ascii
    bytes = given.encode('ascii', errors='backslashreplace')  # convert to bytes
    bytes = nl_ptn.sub(fix_eol, bytes)  # remove excess newlines
    text = bytes.decode('unicode-escape')  # expand escape sequences
    bytes = text.encode(encoding)  # encode with desired encoding
    return b64encode(bytes).decode('ascii')  # encode in base64

def bytes_encode(given, encoding):
    bytes = given.encode('ascii', errors='strict')  # convert to bytes
    bytes = nl_ptn.sub(fix_eol, bytes)  # remove excess newlines
    bytes, _ = codecs.escape_decode(bytes)
        # unfortunately, escape_decode is an undocumented private function
        # if this becomes a problem this line should probably be replaced with a
        # regular expression substitution that matches and maps the normal ASCII 
        # escape sequences and the binary escape sequences (\x00 to \xFF).
    return b64encode(bytes).decode('ascii')  # encode in base64


# CONVERT {{{1
# read command line {{{1
cmdline = docopt(__doc__)
input_path = Path(cmdline['<tests.nt>'] or 'tests.nt')
output_path = input_path.with_suffix('.json')

# read test definitions file (tests.nt) {{{1
try:
    keymap = {}
    tests = nt.load(input_path, keymap=keymap)
    tests = tests_validator(tests)
except OSError as e:
    fatal(os_error(e))
except nt.NestedTextError as e:
    e.terminate()
except MultipleInvalid as e:
    report_voluptuous_errors(e, keymap, source='tests.nt')
    terminate()


# process tests {{{1
processed = {}
accumulated_line_types = defaultdict(int)
try:
    for key, fields in tests.items():
        if 'string_in' not in fields and 'bytes_in' not in fields:
            warn("‘string_in’ is missing.", culprit=key)
            continue

        if 'string_in' in fields and 'bytes_in' in fields:
            warn("must not have both ‘string_in’ and ‘bytes_in’ fields.", culprit=key)
            continue

        # process string_in or bytes_in
        encoding = fields.get('encoding', 'utf-8')
        try:
            if 'string_in' in fields:
                load_in = fields.get('string_in')
                load_in_encoded = utf8_encode(load_in, encoding)
            else:
                load_in = fields.get('bytes_in')
                load_in_encoded = bytes_encode(load_in, encoding)
        except (UnicodeEncodeError, UnicodeDecodeError) as e:
            raise Error(e, culprit=key)

        load_out = fields.get('load_out')
        load_err = fields.get("load_err", {})
        if load_out and load_err:
            raise Error("must not specify both ‘load_out’ and ‘load_err’.")

        processed_test = dict(
            load_in = load_in_encoded,
            load_out = load_out,
            load_err = load_err,
            encoding = encoding,
            types = extract_line_types(load_in_encoded)
        )
        for line_type, count in processed_test['types'].items():
            accumulated_line_types[line_type] += count

        if 'unrecognized' in processed_test['types'] and not load_err:
            # raise Error("unrecognized line in a test that does not expect an error.")
            warn("unrecognized line in a test that does not expect an error.", 
                 culprit=key)

        processed[key] = processed_test

    # write tests.json {{{1
    content = json.dumps(dict(load_tests=processed), indent=4, ensure_ascii=False)
    output_path.write_text(content + '\n', encoding="utf-8")

except OSError as e:
    fatal(os_error(e))
except Error as e:
    e.terminate(culprit=e.get_culprit(key))

if accumulated_line_types:
    print("Count of line types found:")
    print(indent(nt.dumps(accumulated_line_types, sort_keys=True)))
    print()
print(f"Number of tests: {len(tests)}")