ct2rs 0.9.18

Rust bindings for OpenNMT/CTranslate2
Documentation
import numpy as np
import pytest
import test_utils

import ctranslate2

from ctranslate2.converters import utils as conversion_utils
from ctranslate2.specs import common_spec, transformer_spec
from ctranslate2.specs.model_spec import OPTIONAL, index_spec


def test_layer_spec_validate():
    class SubSpec(ctranslate2.specs.LayerSpec):
        def __init__(self):
            self.a = np.ones([5], dtype=np.float16)

    class Spec(ctranslate2.specs.LayerSpec):
        def __init__(self):
            self.a = np.zeros([5], dtype=np.float32)
            self.b = np.zeros([5], dtype=np.float16)
            self.c = np.zeros([5], dtype=np.int32)
            self.d = OPTIONAL
            self.e = SubSpec()
            self.f = True
            self.g = "hello"

    spec = Spec()
    spec.validate()
    assert spec.a.dtype == "float32"
    assert spec.b.dtype == "float16"
    assert spec.c.dtype == "int32"
    assert spec.d == OPTIONAL
    assert spec.e.a.dtype == "float16"
    assert test_utils.array_equal(spec.f.numpy(), np.int8(1))
    assert test_utils.array_equal(
        spec.g.numpy(), np.array([104, 101, 108, 108, 111], dtype=np.int8)
    )

    with pytest.raises(AttributeError, match="Attribute z does not exist"):
        spec.z = True


def test_layer_spec_validate_unset():
    class SubSpec(ctranslate2.specs.LayerSpec):
        def __init__(self):
            self.attr_1 = None

    class Spec(ctranslate2.specs.LayerSpec):
        def __init__(self):
            self.attr_1 = np.zeros([5], dtype=np.float32)
            self.attr_2 = None
            self.attr_3 = SubSpec()

    spec = Spec()

    with pytest.raises(ValueError, match="attr_2\nattr_3.attr_1"):
        spec.validate()


def test_layer_spec_optimize():
    class SubSpec(ctranslate2.specs.LayerSpec):
        def __init__(self):
            self.a = np.ones([6], dtype=np.float32)
            self.weight = np.ones([5, 4], dtype=np.float32)
            self.weight_scale = OPTIONAL

    class Spec(ctranslate2.specs.LayerSpec):
        def __init__(self):
            self.a = np.ones([5], dtype=np.float32)
            self.b = np.ones([5], dtype=np.float32)
            self.c = np.zeros([5], dtype=np.int32)
            self.d = np.dtype("float32").type(3.14)
            self.sub = SubSpec()

    spec = Spec()
    spec.validate()
    spec.optimize(quantization="int16")
    assert spec.a.dtype == "float32"
    assert spec.b == "a"
    assert spec.c.dtype == "int32"
    assert spec.d.dtype == "float32"
    assert spec.sub.weight.dtype == "int16"
    assert spec.sub.weight_scale.dtype == "float32"

    spec = Spec()
    spec.validate()
    spec.optimize(quantization="float16")
    assert spec.a.dtype == "float16"
    assert spec.b == "a"
    assert spec.c.dtype == "int32"
    assert spec.d.dtype == "float32"
    assert spec.sub.weight.dtype == "float16"
    assert spec.sub.a.dtype == "float16"

    spec = Spec()
    spec.validate()
    with pytest.raises(ValueError, match="not a valid quantization type"):
        spec.optimize(quantization="int32")


def test_int8_quantization():
    class Spec(ctranslate2.specs.LayerSpec):
        def __init__(self):
            self.weight = np.array([[-10, -3, 5, 2], [0, 0, 0, 0]], dtype=np.float32)
            self.weight_scale = OPTIONAL

    spec = Spec()
    spec.validate()
    spec.optimize(quantization="int8")
    assert test_utils.array_equal(
        spec.weight.numpy(),
        np.array([[-127, -38, 64, 25], [0, 0, 0, 0]], dtype=np.int8),
    )
    assert test_utils.array_equal(
        spec.weight_scale.numpy(), np.array([12.7, 1], dtype=np.float32)
    )


@pytest.mark.parametrize(
    "quantization,expected_weight,expected_weight_scale,expected_bias",
    [
        (
            None,
            np.array([[-10, -3, 5, 2]], dtype=np.float16),
            None,
            np.array([4], dtype=np.float16),
        ),
        (
            "float32",
            np.array([[-10, -3, 5, 2]], dtype=np.float32),
            None,
            np.array([4], dtype=np.float32),
        ),
        (
            "float16",
            np.array([[-10, -3, 5, 2]], dtype=np.float16),
            None,
            np.array([4], dtype=np.float16),
        ),
        (
            "int8",
            np.array([[-127, -38, 64, 25]], dtype=np.int8),
            np.array([12.7], dtype=np.float32),
            np.array([4], dtype=np.float16),
        ),
        (
            "int8_float16",
            np.array([[-127, -38, 64, 25]], dtype=np.int8),
            np.array([12.7], dtype=np.float32),
            np.array([4], dtype=np.float16),
        ),
        (
            "int8_float32",
            np.array([[-127, -38, 64, 25]], dtype=np.int8),
            np.array([12.7], dtype=np.float32),
            np.array([4], dtype=np.float32),
        ),
        (
            "int16",
            np.array([[-1024, -307, 512, 205]], dtype=np.int16),
            np.float32(102.4),
            np.array([4], dtype=np.float32),
        ),
    ],
)
def test_fp16_weights(
    quantization, expected_weight, expected_weight_scale, expected_bias
):
    class Spec(ctranslate2.specs.LayerSpec):
        def __init__(self, weight, bias):
            self.weight = weight
            self.weight_scale = OPTIONAL
            self.bias = bias

    weight = np.array([[-10, -3, 5, 2]], dtype=np.float16)
    bias = np.array([4], dtype=np.float16)

    spec = Spec(weight, bias)
    spec.validate()
    spec.optimize(quantization=quantization)

    assert test_utils.array_equal(spec.weight.numpy(), expected_weight)
    assert test_utils.array_equal(spec.bias.numpy(), expected_bias)

    # Check the weights were not copied or converted.
    if quantization in (None, "float16"):
        assert spec.weight.numpy() is weight
        assert spec.bias.numpy() is bias
    elif quantization in ("int8", "int8_float16"):
        assert spec.bias.numpy() is bias

    if expected_weight_scale is None:
        assert spec.weight_scale == OPTIONAL
    else:
        assert test_utils.array_equal(spec.weight_scale.numpy(), expected_weight_scale)


def test_index_spec():
    spec = ctranslate2.specs.TransformerSpec.from_config(6, 8)
    assert isinstance(
        index_spec(spec, "encoder/layer_5"),
        transformer_spec.TransformerEncoderLayerSpec,
    )
    assert isinstance(
        index_spec(spec, "encoder/layer_5/ffn"), transformer_spec.FeedForwardSpec
    )


def test_fuse_linear_no_bias():
    layers = []
    for _ in range(3):
        spec = common_spec.LinearSpec()
        spec.weight = np.zeros([64, 64], dtype=np.float32)
        layers.append(spec)

    spec = common_spec.LinearSpec()
    conversion_utils.fuse_linear(spec, layers)
    assert spec.weight.shape[0] == 64 * 3
    assert spec.bias == OPTIONAL

    spec = common_spec.LinearSpec()
    layers[1].bias = np.ones([64], dtype=np.float32)
    conversion_utils.fuse_linear(spec, layers)
    assert test_utils.array_equal(spec.bias[:64], np.zeros([64], dtype=np.float32))
    assert test_utils.array_equal(spec.bias[64:128], np.ones([64], dtype=np.float32))
    assert test_utils.array_equal(spec.bias[128:], np.zeros([64], dtype=np.float32))


@test_utils.skip_on_windows
def test_fuse_linear_torch():
    import torch

    layers = []
    for _ in range(3):
        spec = common_spec.LinearSpec()
        spec.weight = torch.zeros([64, 64], dtype=torch.float32)
        spec.bias = torch.zeros([64], dtype=torch.float32)
        layers.append(spec)

    spec = common_spec.LinearSpec()
    conversion_utils.fuse_linear(spec, layers)
    assert spec.weight.shape[0] == 64 * 3
    assert spec.bias.shape[0] == 64 * 3


@test_utils.skip_on_windows
def test_smooth_activation_torch():
    import torch

    layer_norm = common_spec.LayerNormSpec()
    layer_norm.beta = torch.rand([64], dtype=torch.float16)
    layer_norm.gamma = torch.rand([64], dtype=torch.float16)

    linear = common_spec.LinearSpec()
    linear.weight = torch.rand([64, 64], dtype=torch.float16)

    activation_scales = torch.rand([64], dtype=torch.float32)

    # Just check that no error is raised.
    conversion_utils.smooth_activation(layer_norm, linear, activation_scales)


@test_utils.skip_on_windows
@pytest.mark.parametrize("variable_dtype", ["float32", "float16", "bfloat16"])
@pytest.mark.parametrize(
    "quantization,expected_weight_dtype,expected_bias_dtype",
    [
        (None, None, None),
        ("int8", "int8", None),
        ("int8_float32", "int8", "float32"),
        ("int8_float16", "int8", "float16"),
        ("int8_bfloat16", "int8", "bfloat16"),
        ("int16", "int16", "float32"),
        ("float16", "float16", "float16"),
        ("bfloat16", "bfloat16", "bfloat16"),
        ("float32", "float32", "float32"),
    ],
)
def test_torch_variables(
    tmp_dir, variable_dtype, quantization, expected_weight_dtype, expected_bias_dtype
):
    import torch

    if expected_weight_dtype is None:
        expected_weight_dtype = variable_dtype
    if expected_bias_dtype is None:
        expected_bias_dtype = variable_dtype

    variable_dtype = getattr(torch, variable_dtype)

    class TorchModel(ctranslate2.specs.ModelSpec):
        def __init__(self):
            super().__init__()
            self.dense = common_spec.LinearSpec()
            self.dense.weight = torch.ones([16, 4], dtype=variable_dtype)
            self.dense.bias = torch.ones([16], dtype=variable_dtype)

        @property
        def name(self):
            return "TorchModel"

    model = TorchModel()
    model.validate()
    model.optimize(quantization)

    variables = model.variables()
    assert variables["dense/weight"].dtype == expected_weight_dtype
    assert variables["dense/bias"].dtype == expected_bias_dtype

    model.save(tmp_dir)