nlprule-build 0.6.1

Build tools for a fast, low-resource Natural Language Processing and Error Correction library.
Documentation
"""
Converts chunker from binary format to something we can work with.

A port of the relevant *Reader.java files from OpenNLP.
E. g. https://github.com/apache/opennlp/blob/master/opennlp-tools/src/main/java/opennlp/tools/ml/maxent/io/GISModelReader.java
"""

import json
import struct
import zipfile
from io import BytesIO
from xml.etree import ElementTree


# see https://stackoverflow.com/a/29526850
class DataInputStream:
    def __init__(self, stream):
        self.stream = stream

    def read_boolean(self):
        return struct.unpack("?", self.stream.read(1))[0]

    def read_byte(self):
        return struct.unpack("b", self.stream.read(1))[0]

    def read_unsigned_byte(self):
        return struct.unpack("B", self.stream.read(1))[0]

    def read_char(self):
        return chr(struct.unpack(">H", self.stream.read(2))[0])

    def read_double(self):
        return struct.unpack(">d", self.stream.read(8))[0]

    def read_float(self):
        return struct.unpack(">f", self.stream.read(4))[0]

    def read_short(self):
        return struct.unpack(">h", self.stream.read(2))[0]

    def read_unsigned_short(self):
        return struct.unpack(">H", self.stream.read(2))[0]

    def read_long(self):
        return struct.unpack(">q", self.stream.read(8))[0]

    def read_utf(self):
        utf_length = struct.unpack(">H", self.stream.read(2))[0]
        return self.stream.read(utf_length).decode("utf-8")

    def read_int(self):
        return struct.unpack(">i", self.stream.read(4))[0]


def read_model(stream):
    stream = DataInputStream(stream)

    name = stream.read_utf()
    assert name == "GIS"

    # read correction constant (not used anymore)
    stream.read_int()
    # read correction params (not used anymore)
    stream.read_double()

    outcome_labels = [stream.read_utf() for _ in range(stream.read_int())]
    outcome_patterns = [
        [int(x) for x in stream.read_utf().split(" ")] for _ in range(stream.read_int())
    ]
    pred_labels = [stream.read_utf() for _ in range(stream.read_int())]

    params = []
    for pattern in outcome_patterns:
        outcome_pattern = pattern[1:]

        for _ in range(pattern[0]):
            context_parameters = [
                stream.read_double() for _ in range(len(outcome_pattern))
            ]
            params.append(
                {"outcomes": outcome_pattern, "parameters": context_parameters}
            )

    pmap = {label: param for label, param in zip(pred_labels, params)}

    return {
        # "params": params, # we don't need these for inference
        # "pred_labels": pred_labels,
        "outcome_labels": outcome_labels,
        "pmap": pmap,
    }


def write_chunker(outfile, token_model_path, pos_model_path, chunk_model_path):
    with zipfile.ZipFile(token_model_path, "r") as f:
        token_model = read_model(BytesIO(f.read("token.model")))

    with zipfile.ZipFile(pos_model_path, "r") as f:
        pos_model = read_model(BytesIO(f.read("pos.model")))

        tagdict = {}

        tree = ElementTree.fromstring(f.read("tags.tagdict").decode("utf-8"))
        for e in tree.findall("entry"):
            children = e.findall("token")
            assert len(children) == 1

            tagdict[children[0].text] = e.attrib["tags"].split()

    with zipfile.ZipFile(chunk_model_path, "r") as f:
        chunk_model = read_model(BytesIO(f.read("chunker.model")))

    json.dump(
        {
            "token_model": token_model,
            "pos_model": pos_model,
            "pos_tagdict": tagdict,
            "chunk_model": chunk_model,
        },
        open(outfile, "w"),
        indent=4,
    )