megenginelite-sys 1.8.2

A safe megenginelite wrapper in Rust
Documentation
# -*- coding: utf-8 -*-
# MegEngine is Licensed under the Apache License, Version 2.0 (the "License")
#
# Copyright (c) 2014-2021 Megvii Inc. All rights reserved.
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
import numpy as np
import pytest

import megengine.functional as F
from megengine import tensor


def test_cross_entropy_with_logits():
    data = tensor([[0, 50], [0, -150]]).astype(np.float32)
    label = tensor([1, 0]).astype(np.int32)
    loss = F.nn.cross_entropy(data, label)
    np.testing.assert_allclose(loss.numpy(), 0.0)
    label = tensor([0, 1]).astype(np.int32)
    loss = F.nn.cross_entropy(data, label)
    np.testing.assert_allclose(loss.numpy(), 100)

    label = np.array([1, 0])
    loss = F.nn.cross_entropy(data, label)
    np.testing.assert_allclose(loss.numpy(), 0.0)


def test_cross_entropy():
    def softmax(x):
        x = np.exp(x)
        x /= x.sum(1, keepdims=True)
        return x

    def ref(x, y):
        return np.mean([-np.log(x[i, y[i]]) for i in range(len(y))])

    x = (np.random.rand(5, 10) - 0.5) * 4
    y = np.random.randint(10, size=(5,))
    for i in range(len(x)):
        x[i, y[i]] += np.random.rand() * 2
    x = softmax(x)
    l_ref = ref(x, y)
    l = F.nn.cross_entropy(tensor(x, "float32"), tensor(y, "int32"), with_logits=False)
    np.testing.assert_allclose(l.numpy(), l_ref, 1e-6, 1e-6)


def test_cross_entropy_reduction():
    logits = np.random.randn(16, 10)
    label = np.random.randint(10, size=[16])
    logits = tensor(logits, dtype="float32")
    label = tensor(label, dtype="int32")

    perm = np.random.permutation(16)
    logits_perm = tensor(logits[perm], dtype="float32")
    label_perm = tensor(label[perm], dtype="int32")

    loss = F.nn.cross_entropy(logits, label, reduction="none")
    loss_perm = F.nn.cross_entropy(logits_perm, label_perm, reduction="none")
    np.testing.assert_allclose(loss.numpy()[perm], loss_perm.numpy())

    loss_sum = F.nn.cross_entropy(logits, label, reduction="sum")
    np.testing.assert_allclose(loss.numpy().sum(), loss_sum.numpy(), rtol=2e-7)

    loss_mean = F.nn.cross_entropy(logits, label, reduction="mean")
    np.testing.assert_allclose(loss_mean.numpy(), loss_sum.numpy() / 16)

    loss_ls = F.nn.cross_entropy(logits, label, reduction="mean", label_smooth=0.1)
    loss_ls_none_reduce = F.nn.cross_entropy(
        logits, label, reduction="none", label_smooth=0.1
    )
    np.testing.assert_allclose(
        loss_ls.numpy(), loss_ls_none_reduce.numpy().mean(), rtol=2e-7
    )

    with pytest.raises(ValueError):
        F.nn.cross_entropy(logits, label, reduction="MEAN")

    with pytest.raises(ValueError):
        F.nn.cross_entropy(logits, label, reduction="max")


def ctc_nll_naive_npy(
    pred,
    pred_lengths,
    label,
    label_lengths,
    blank=0,
    reduction="mean",
    time_major=False,
):
    """naive :func:`ctc_nll` using numpy arrays. Used for testing and helping
    our user to understand how CTC works. Only ``LABEL_COMPACT`` mode is
    supported."""

    pred = np.asarray(pred, dtype=np.float32)
    pred_lengths = np.asarray(pred_lengths, dtype=np.int8)
    label = np.asarray(label, dtype=np.int32)
    label_lengths = np.asarray(label_lengths, dtype=np.int32)

    if time_major:
        pred = np.transpose(pred, (1, 0, 2))
    # pred in (N, T, P) format

    batch_size, time_len, nr_class = pred.shape
    assert pred_lengths.shape == (batch_size,) and pred_lengths.max() <= pred.shape[1]
    assert label_lengths.shape == (batch_size,)
    assert label.shape == (label_lengths.sum(),) and label.max() < nr_class

    ret = np.empty((batch_size,), dtype=np.float32)
    label_start = 0
    for i in range(batch_size):
        label_end = label_start + label_lengths[i]
        ret[i] = _ctc_npy_single_seq(
            pred[i][: pred_lengths[i]], label[label_start:label_end], blank
        )
        label_start = label_end

    if reduction == "mean":
        return (ret / label_lengths).mean()
    elif reduction == "sum":
        return ret.sum()
    elif reduction == "none":
        return ret
    else:
        raise ValueError("{} is not a valid value for reduction".format(reduction))


def _ctc_npy_single_seq(pred, label, blank):
    def safelog(x):
        eps = np.finfo(x.dtype).tiny
        return np.log(np.maximum(x, eps))

    def log_sum_exp(x, y):
        x, y = np.maximum(x, y), np.minimum(x, y)
        return x + np.log1p(np.exp(y - x))

    assert np.abs(pred.sum(axis=1) - 1).max() <= 1e-3
    len_pred, alphabet_size = pred.shape
    (len_label,) = label.shape

    len_ex_label = len_label * 2 + 1
    ex_label = (np.zeros(len_ex_label)).astype(np.int32) + blank
    ex_label[1::2] = label

    prob = np.zeros(len_ex_label, dtype=np.float32)
    prob[0] = pred[0][ex_label[0]]
    prob[1] = pred[0][ex_label[1]]
    prob = safelog(prob)  # compute on log scale

    ex_label_pmask = ex_label[2:] != ex_label[:-2]
    for t in range(1, len_pred):
        # enter loop: prob[i] = log(p(pred[:t+1], label[:i+1]))
        new_prob = prob.copy()
        new_prob[1:] = log_sum_exp(new_prob[1:], prob[:-1])
        new_prob[2:] = (
            new_prob[2:] * (1 - ex_label_pmask)
            + log_sum_exp(new_prob[2:], prob[:-2]) * ex_label_pmask
        )
        new_prob += safelog(pred[t, ex_label])
        prob = new_prob

    return -log_sum_exp(prob[-1], prob[-2])


def test_ctc_loss():
    def test_func(T, C, N):
        input = np.random.randn(T, N, C)
        input = F.softmax(tensor(input), axis=-1).numpy()
        input_lengths = np.ones(N, dtype=np.int32) * T
        target_lengths = np.random.randint(low=1, high=T + 1, size=(N,), dtype=np.int32)
        target = np.random.randint(
            low=1, high=C, size=(sum(target_lengths)), dtype=np.int32
        )

        input_mge = tensor(input)
        input_lengths_mge = tensor(input_lengths)

        target_mge = tensor(target)
        target_lengths_mge = tensor(target_lengths)

        blank = np.random.randint(C)
        for method in ["mean", "sum", "none"]:
            np_out = ctc_nll_naive_npy(
                input,
                input_lengths,
                target,
                target_lengths,
                blank=blank,
                reduction=method,
                time_major=True,
            )
            mge_out = F.nn.ctc_loss(
                input_mge,
                input_lengths_mge,
                target_mge,
                target_lengths_mge,
                blank=blank,
                reduction=method,
            )
            np.testing.assert_allclose(mge_out.numpy(), np_out, rtol=2e-6)

    cases = [[1, 2, 1], [100, 50, 200], [100, 5, 1]]
    for case in cases:
        test_func(*case)