megenginelite-sys 1.8.2

A safe megenginelite wrapper in Rust
Documentation
# -*- coding: utf-8 -*-
# MegEngine is Licensed under the Apache License, Version 2.0 (the "License")
#
# Copyright (c) 2014-2021 Megvii Inc. All rights reserved.
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
import platform

import numpy as np
import pytest

import megengine as mge
import megengine.distributed as dist
from megengine import Parameter, tensor
from megengine.core._imperative_rt.core2 import sync
from megengine.device import get_default_device, set_default_device
from megengine.functional.distributed import (
    all_gather,
    all_reduce_max,
    all_reduce_min,
    all_reduce_sum,
    all_to_all,
    broadcast,
    gather,
    reduce_scatter_sum,
    reduce_sum,
    remote_recv,
    remote_send,
    scatter,
)


def run_reduce_sum(shape, dtype):
    @dist.launcher(n_gpus=2)
    def worker(data, expect):
        rank = dist.get_rank()
        inp = tensor(data[rank])
        output = reduce_sum(inp)
        if rank == 0:
            assert np.allclose(output.numpy(), expect[rank])
        else:
            assert output is None

    x = np.random.random_sample(shape).astype(dtype)
    y = np.random.random_sample(shape).astype(dtype)
    z = x + y
    data = (x, y)
    expect = (z, None)
    worker(data, expect)


@pytest.mark.require_ngpu(2)
@pytest.mark.parametrize("shape", [(), (1,), (2, 3), (8, 10), (99, 77)], ids=str)
@pytest.mark.isolated_distributed
def test_reduce_sum_multishape(shape):
    run_reduce_sum(shape, "float32")


@pytest.mark.require_ngpu(2)
@pytest.mark.parametrize("dtype", ["float32", "int32", "int8", "uint8"], ids=str)
@pytest.mark.isolated_distributed
def test_reduce_sum_multidtype(dtype):
    run_reduce_sum((8, 10), dtype)


def run_broadcast(shape, dtype):
    @dist.launcher(n_gpus=2)
    def worker(data, expect):
        rank = dist.get_rank()
        inp = tensor(data[rank])
        output = broadcast(inp)
        assert np.allclose(output.numpy(), expect[rank])

    x = np.random.random_sample(shape).astype(dtype)
    y = x + 1
    data = (x, y)
    expect = (x, x)
    worker(data, expect)


@pytest.mark.require_ngpu(2)
@pytest.mark.parametrize("shape", [(), (1,), (2, 3), (8, 10), (99, 77)], ids=str)
@pytest.mark.isolated_distributed
def test_broadcast_multishape(shape):
    run_broadcast(shape, "float32")


@pytest.mark.require_ngpu(2)
@pytest.mark.parametrize("dtype", ["float32", "int32", "int8", "uint8"], ids=str)
@pytest.mark.isolated_distributed
def test_broadcast_multidtype(dtype):
    run_broadcast((8, 10), dtype)


def run_all_gather(shape, dtype):
    @dist.launcher(n_gpus=2)
    def worker(data, expect):
        rank = dist.get_rank()
        inp = tensor(data[rank])
        output = all_gather(inp)
        assert np.allclose(output.numpy(), expect[rank])

    x = np.random.random_sample(shape).astype(dtype)
    y = np.random.random_sample(shape).astype(dtype)
    z = np.concatenate((x, y))
    data = (x, y)
    expect = (z, z)
    worker(data, expect)


@pytest.mark.require_ngpu(2)
@pytest.mark.parametrize("shape", [(1,), (2, 3), (8, 10), (99, 77)], ids=str)
@pytest.mark.isolated_distributed
def test_all_gather_multishape(shape):
    run_all_gather(shape, "float32")


@pytest.mark.require_ngpu(2)
@pytest.mark.parametrize("dtype", ["float32", "int32", "int8", "uint8"], ids=str)
@pytest.mark.isolated_distributed
def test_all_gather_multidtype(dtype):
    run_all_gather((8, 10), dtype)


def run_reduce_scatter_sum(shape, dtype):
    @dist.launcher(n_gpus=2)
    def worker(data, expect):
        rank = dist.get_rank()
        inp = tensor(data[rank])
        output = reduce_scatter_sum(inp)
        assert np.allclose(output.numpy(), expect[rank])

    x = np.random.random_sample(shape).astype(dtype)
    y = np.random.random_sample(shape).astype(dtype)
    z = x + y
    data = (x, y)
    expect = (z[: shape[0] // 2], z[shape[0] // 2 :])
    worker(data, expect)


@pytest.mark.require_ngpu(2)
@pytest.mark.parametrize("shape", [(2, 3), (8, 10), (88, 44)], ids=str)
@pytest.mark.isolated_distributed
def test_reduce_scatter_sum_multishape(shape):
    run_reduce_scatter_sum(shape, "float32")


@pytest.mark.require_ngpu(2)
@pytest.mark.parametrize("dtype", ["float32", "int32", "int8", "uint8"], ids=str)
@pytest.mark.isolated_distributed
def test_reduce_scatter_sum_multidtype(dtype):
    run_reduce_scatter_sum((8, 10), dtype)


def run_all_reduce_sum(shape, dtype):
    @dist.launcher(n_gpus=2)
    def worker(data, expect):
        rank = dist.get_rank()
        inp = tensor(data[rank])
        output = all_reduce_sum(inp)
        assert np.allclose(output.numpy(), expect[rank])

    x = np.random.random_sample(shape).astype(dtype)
    y = np.random.random_sample(shape).astype(dtype)
    z = x + y
    data = (x, y)
    expect = (z, z)
    worker(data, expect)


@pytest.mark.require_ngpu(2)
@pytest.mark.parametrize("shape", [(), (1,), (2, 3), (8, 10), (99, 77)], ids=str)
@pytest.mark.isolated_distributed
def test_all_reduce_sum_multishape(shape):
    run_all_reduce_sum(shape, "float32")


@pytest.mark.require_ngpu(2)
@pytest.mark.parametrize("dtype", ["float32", "int32", "int8", "uint8"], ids=str)
@pytest.mark.isolated_distributed
def test_all_reduce_sum_multidtype(dtype):
    run_all_reduce_sum((8, 10), dtype)


def run_all_reduce_max(shape, dtype):
    @dist.launcher(n_gpus=2)
    def worker(data, expect):
        rank = dist.get_rank()
        inp = tensor(data[rank])
        output = all_reduce_max(inp)
        assert np.allclose(output.numpy(), expect[rank])

    x = np.random.random_sample(shape).astype(dtype)
    y = np.random.random_sample(shape).astype(dtype)
    z = np.maximum(x, y)
    data = (x, y)
    expect = (z, z)
    worker(data, expect)


@pytest.mark.require_ngpu(2)
@pytest.mark.parametrize("shape", [(), (1,), (2, 3), (8, 10), (99, 77)], ids=str)
@pytest.mark.isolated_distributed
def test_all_reduce_max_multishape(shape):
    run_all_reduce_max(shape, "float32")


@pytest.mark.require_ngpu(2)
@pytest.mark.parametrize("dtype", ["float32", "int32", "int8", "uint8"], ids=str)
@pytest.mark.isolated_distributed
def test_all_reduce_max_multidtype(dtype):
    run_all_reduce_max((8, 10), dtype)


def run_all_reduce_min(shape, dtype):
    @dist.launcher(n_gpus=2)
    def worker(data, expect):
        rank = dist.get_rank()
        inp = tensor(data[rank])
        output = all_reduce_min(inp)
        assert np.allclose(output.numpy(), expect[rank])

    x = np.random.random_sample(shape).astype(dtype)
    y = np.random.random_sample(shape).astype(dtype)
    z = np.minimum(x, y)
    data = (x, y)
    expect = (z, z)
    worker(data, expect)


@pytest.mark.require_ngpu(2)
@pytest.mark.parametrize("shape", [(), (1,), (2, 3), (8, 10), (99, 77)], ids=str)
@pytest.mark.isolated_distributed
def test_all_reduce_min_multishape(shape):
    run_all_reduce_min(shape, "float32")


@pytest.mark.require_ngpu(2)
@pytest.mark.parametrize("dtype", ["float32", "int32", "int8", "uint8"], ids=str)
@pytest.mark.isolated_distributed
def test_all_reduce_min_multidtype(dtype):
    run_all_reduce_min((8, 10), dtype)


def run_gather(shape, dtype):
    @dist.launcher(n_gpus=2)
    def worker(data, expect):
        rank = dist.get_rank()
        inp = tensor(data[rank])
        output = gather(inp)
        if rank == 0:
            assert np.allclose(output.numpy(), expect[rank])
        else:
            assert output is None

    x = np.random.random_sample(shape).astype(dtype)
    y = np.random.random_sample(shape).astype(dtype)
    z = np.concatenate((x, y))
    data = (x, y)
    expect = (z, None)
    worker(data, expect)


@pytest.mark.require_ngpu(2)
@pytest.mark.parametrize("shape", [(2, 3), (8, 10), (99, 77)], ids=str)
@pytest.mark.isolated_distributed
def test_gather_multishape(shape):
    run_gather(shape, "float32")


@pytest.mark.require_ngpu(2)
@pytest.mark.parametrize("dtype", ["float32", "int32", "int8", "uint8"], ids=str)
@pytest.mark.isolated_distributed
def test_gather_multidtype(dtype):
    run_gather((8, 10), dtype)


def run_scatter(shape, dtype):
    @dist.launcher(n_gpus=2)
    def worker(data, expect):
        rank = dist.get_rank()
        inp = tensor(data[rank])
        output = scatter(inp)
        assert np.allclose(output.numpy(), expect[rank])

    x = np.random.random_sample(shape).astype(dtype)
    y = x + 1
    data = (x, y)
    expect = (x[: shape[0] // 2], x[shape[0] // 2 :])
    worker(data, expect)


@pytest.mark.require_ngpu(2)
@pytest.mark.parametrize("shape", [(2, 3), (8, 10), (100, 77)], ids=str)
@pytest.mark.isolated_distributed
def test_scatter_multishape(shape):
    run_scatter(shape, "float32")


@pytest.mark.require_ngpu(2)
@pytest.mark.parametrize("dtype", ["float32", "int32", "int8", "uint8"], ids=str)
@pytest.mark.isolated_distributed
def test_scatter_multidtype(dtype):
    run_scatter((8, 10), dtype)


def run_all_to_all(shape, dtype):
    @dist.launcher(n_gpus=2)
    def worker(data, expect):
        rank = dist.get_rank()
        inp = tensor(data[rank])
        output = all_to_all(inp)
        assert np.allclose(output.numpy(), expect[rank])

    x = np.random.random_sample(shape).astype(dtype)
    y = np.random.random_sample(shape).astype(dtype)
    a = np.concatenate((x[: shape[0] // 2], y[: shape[0] // 2]))
    b = np.concatenate((x[shape[0] // 2 :], y[shape[0] // 2 :]))
    data = (x, y)
    expect = (a, b)
    worker(data, expect)


@pytest.mark.require_ngpu(2)
@pytest.mark.parametrize("shape", [(2, 3), (8, 10), (100, 77)], ids=str)
@pytest.mark.isolated_distributed
def test_all_to_all_multishape(shape):
    run_all_to_all(shape, "float32")


@pytest.mark.require_ngpu(2)
@pytest.mark.parametrize("dtype", ["float32", "int32", "int8", "uint8"], ids=str)
@pytest.mark.isolated_distributed
def test_all_to_all_multidtype(dtype):
    run_all_to_all((8, 10), dtype)


def run_io_remote(shape, dtype):
    @dist.launcher(n_gpus=2)
    def worker(val, shape):
        rank = dist.get_rank()
        if rank == 0:  # remote send
            x = tensor(val, device="xpu0")
            remote_send(x, 1)
            sync()
        else:  # remote recv
            y = remote_recv(0)
            assert y.device == get_default_device()
            np.testing.assert_almost_equal(val, y.numpy())

    val = np.random.random_sample(shape).astype(dtype)
    worker(val, shape)


@pytest.mark.require_ngpu(2)
@pytest.mark.isolated_distributed
@pytest.mark.parametrize("shape", [(), (1,), (4, 5)], ids=str)
def test_io_remote_multishape(shape):
    run_io_remote(shape, "float32")


@pytest.mark.require_ngpu(2)
@pytest.mark.isolated_distributed
@pytest.mark.parametrize("dtype", ["float32", "int32", "int8", "uint8"], ids=str)
def test_io_remote_multidtype(dtype):
    run_io_remote((8, 10), dtype)


@pytest.mark.require_ngpu(2)
def test_cuda_init_before_fork():
    a = mge.tensor(1, device="gpu0")

    @dist.launcher(n_gpus=2)
    def worker():
        a += 1
        b = mge.tensor(2)

    with pytest.raises(AssertionError):
        worker()