import numpy as np
import pytest
import test_utils
import ctranslate2
from ctranslate2.converters import utils as conversion_utils
from ctranslate2.specs import common_spec, transformer_spec
from ctranslate2.specs.model_spec import OPTIONAL, index_spec
def test_layer_spec_validate():
class SubSpec(ctranslate2.specs.LayerSpec):
def __init__(self):
self.a = np.ones([5], dtype=np.float16)
class Spec(ctranslate2.specs.LayerSpec):
def __init__(self):
self.a = np.zeros([5], dtype=np.float32)
self.b = np.zeros([5], dtype=np.float16)
self.c = np.zeros([5], dtype=np.int32)
self.d = OPTIONAL
self.e = SubSpec()
self.f = True
self.g = "hello"
spec = Spec()
spec.validate()
assert spec.a.dtype == "float32"
assert spec.b.dtype == "float16"
assert spec.c.dtype == "int32"
assert spec.d == OPTIONAL
assert spec.e.a.dtype == "float16"
assert test_utils.array_equal(spec.f.numpy(), np.int8(1))
assert test_utils.array_equal(
spec.g.numpy(), np.array([104, 101, 108, 108, 111], dtype=np.int8)
)
with pytest.raises(AttributeError, match="Attribute z does not exist"):
spec.z = True
def test_layer_spec_validate_unset():
class SubSpec(ctranslate2.specs.LayerSpec):
def __init__(self):
self.attr_1 = None
class Spec(ctranslate2.specs.LayerSpec):
def __init__(self):
self.attr_1 = np.zeros([5], dtype=np.float32)
self.attr_2 = None
self.attr_3 = SubSpec()
spec = Spec()
with pytest.raises(ValueError, match="attr_2\nattr_3.attr_1"):
spec.validate()
def test_layer_spec_optimize():
class SubSpec(ctranslate2.specs.LayerSpec):
def __init__(self):
self.a = np.ones([6], dtype=np.float32)
self.weight = np.ones([5, 4], dtype=np.float32)
self.weight_scale = OPTIONAL
class Spec(ctranslate2.specs.LayerSpec):
def __init__(self):
self.a = np.ones([5], dtype=np.float32)
self.b = np.ones([5], dtype=np.float32)
self.c = np.zeros([5], dtype=np.int32)
self.d = np.dtype("float32").type(3.14)
self.sub = SubSpec()
spec = Spec()
spec.validate()
spec.optimize(quantization="int16")
assert spec.a.dtype == "float32"
assert spec.b == "a"
assert spec.c.dtype == "int32"
assert spec.d.dtype == "float32"
assert spec.sub.weight.dtype == "int16"
assert spec.sub.weight_scale.dtype == "float32"
spec = Spec()
spec.validate()
spec.optimize(quantization="float16")
assert spec.a.dtype == "float16"
assert spec.b == "a"
assert spec.c.dtype == "int32"
assert spec.d.dtype == "float32"
assert spec.sub.weight.dtype == "float16"
assert spec.sub.a.dtype == "float16"
spec = Spec()
spec.validate()
with pytest.raises(ValueError, match="not a valid quantization type"):
spec.optimize(quantization="int32")
def test_int8_quantization():
class Spec(ctranslate2.specs.LayerSpec):
def __init__(self):
self.weight = np.array([[-10, -3, 5, 2], [0, 0, 0, 0]], dtype=np.float32)
self.weight_scale = OPTIONAL
spec = Spec()
spec.validate()
spec.optimize(quantization="int8")
assert test_utils.array_equal(
spec.weight.numpy(),
np.array([[-127, -38, 64, 25], [0, 0, 0, 0]], dtype=np.int8),
)
assert test_utils.array_equal(
spec.weight_scale.numpy(), np.array([12.7, 1], dtype=np.float32)
)
@pytest.mark.parametrize(
"quantization,expected_weight,expected_weight_scale,expected_bias",
[
(
None,
np.array([[-10, -3, 5, 2]], dtype=np.float16),
None,
np.array([4], dtype=np.float16),
),
(
"float32",
np.array([[-10, -3, 5, 2]], dtype=np.float32),
None,
np.array([4], dtype=np.float32),
),
(
"float16",
np.array([[-10, -3, 5, 2]], dtype=np.float16),
None,
np.array([4], dtype=np.float16),
),
(
"int8",
np.array([[-127, -38, 64, 25]], dtype=np.int8),
np.array([12.7], dtype=np.float32),
np.array([4], dtype=np.float16),
),
(
"int8_float16",
np.array([[-127, -38, 64, 25]], dtype=np.int8),
np.array([12.7], dtype=np.float32),
np.array([4], dtype=np.float16),
),
(
"int8_float32",
np.array([[-127, -38, 64, 25]], dtype=np.int8),
np.array([12.7], dtype=np.float32),
np.array([4], dtype=np.float32),
),
(
"int16",
np.array([[-1024, -307, 512, 205]], dtype=np.int16),
np.float32(102.4),
np.array([4], dtype=np.float32),
),
],
)
def test_fp16_weights(
quantization, expected_weight, expected_weight_scale, expected_bias
):
class Spec(ctranslate2.specs.LayerSpec):
def __init__(self, weight, bias):
self.weight = weight
self.weight_scale = OPTIONAL
self.bias = bias
weight = np.array([[-10, -3, 5, 2]], dtype=np.float16)
bias = np.array([4], dtype=np.float16)
spec = Spec(weight, bias)
spec.validate()
spec.optimize(quantization=quantization)
assert test_utils.array_equal(spec.weight.numpy(), expected_weight)
assert test_utils.array_equal(spec.bias.numpy(), expected_bias)
if quantization in (None, "float16"):
assert spec.weight.numpy() is weight
assert spec.bias.numpy() is bias
elif quantization in ("int8", "int8_float16"):
assert spec.bias.numpy() is bias
if expected_weight_scale is None:
assert spec.weight_scale == OPTIONAL
else:
assert test_utils.array_equal(spec.weight_scale.numpy(), expected_weight_scale)
def test_index_spec():
spec = ctranslate2.specs.TransformerSpec.from_config(6, 8)
assert isinstance(
index_spec(spec, "encoder/layer_5"),
transformer_spec.TransformerEncoderLayerSpec,
)
assert isinstance(
index_spec(spec, "encoder/layer_5/ffn"), transformer_spec.FeedForwardSpec
)
def test_fuse_linear_no_bias():
layers = []
for _ in range(3):
spec = common_spec.LinearSpec()
spec.weight = np.zeros([64, 64], dtype=np.float32)
layers.append(spec)
spec = common_spec.LinearSpec()
conversion_utils.fuse_linear(spec, layers)
assert spec.weight.shape[0] == 64 * 3
assert spec.bias == OPTIONAL
spec = common_spec.LinearSpec()
layers[1].bias = np.ones([64], dtype=np.float32)
conversion_utils.fuse_linear(spec, layers)
assert test_utils.array_equal(spec.bias[:64], np.zeros([64], dtype=np.float32))
assert test_utils.array_equal(spec.bias[64:128], np.ones([64], dtype=np.float32))
assert test_utils.array_equal(spec.bias[128:], np.zeros([64], dtype=np.float32))
@test_utils.skip_on_windows
def test_fuse_linear_torch():
import torch
layers = []
for _ in range(3):
spec = common_spec.LinearSpec()
spec.weight = torch.zeros([64, 64], dtype=torch.float32)
spec.bias = torch.zeros([64], dtype=torch.float32)
layers.append(spec)
spec = common_spec.LinearSpec()
conversion_utils.fuse_linear(spec, layers)
assert spec.weight.shape[0] == 64 * 3
assert spec.bias.shape[0] == 64 * 3
@test_utils.skip_on_windows
def test_smooth_activation_torch():
import torch
layer_norm = common_spec.LayerNormSpec()
layer_norm.beta = torch.rand([64], dtype=torch.float16)
layer_norm.gamma = torch.rand([64], dtype=torch.float16)
linear = common_spec.LinearSpec()
linear.weight = torch.rand([64, 64], dtype=torch.float16)
activation_scales = torch.rand([64], dtype=torch.float32)
conversion_utils.smooth_activation(layer_norm, linear, activation_scales)
@test_utils.skip_on_windows
@pytest.mark.parametrize("variable_dtype", ["float32", "float16", "bfloat16"])
@pytest.mark.parametrize(
"quantization,expected_weight_dtype,expected_bias_dtype",
[
(None, None, None),
("int8", "int8", None),
("int8_float32", "int8", "float32"),
("int8_float16", "int8", "float16"),
("int8_bfloat16", "int8", "bfloat16"),
("int16", "int16", "float32"),
("float16", "float16", "float16"),
("bfloat16", "bfloat16", "bfloat16"),
("float32", "float32", "float32"),
],
)
def test_torch_variables(
tmp_dir, variable_dtype, quantization, expected_weight_dtype, expected_bias_dtype
):
import torch
if expected_weight_dtype is None:
expected_weight_dtype = variable_dtype
if expected_bias_dtype is None:
expected_bias_dtype = variable_dtype
variable_dtype = getattr(torch, variable_dtype)
class TorchModel(ctranslate2.specs.ModelSpec):
def __init__(self):
super().__init__()
self.dense = common_spec.LinearSpec()
self.dense.weight = torch.ones([16, 4], dtype=variable_dtype)
self.dense.bias = torch.ones([16], dtype=variable_dtype)
@property
def name(self):
return "TorchModel"
model = TorchModel()
model.validate()
model.optimize(quantization)
variables = model.variables()
assert variables["dense/weight"].dtype == expected_weight_dtype
assert variables["dense/bias"].dtype == expected_bias_dtype
model.save(tmp_dir)