#include "megbrain/opr/io_remote.h"
#include "megbrain/opr/io.h"
#include "megbrain/opr/utility.h"
#include "megbrain/system.h"
#include "megbrain/test/helper.h"
#include "mock_client.h"
#include <thread>
using namespace mgb;
TEST(TestOprIORemote, Identity) {
REQUIRE_GPU(2);
auto cn0 = CompNode::load("gpu0");
auto cn1 = CompNode::load("gpu1");
HostTensorGenerator<> gen;
auto host_x = gen({28, 28});
HostTensorND host_y;
auto client = std::make_shared<test::MockGroupClient>();
auto graph = ComputingGraph::make();
auto x = opr::Host2DeviceCopy::make(*graph, host_x, cn0);
auto xr = opr::RemoteSend::make("x", x, client, false, "nccl");
auto y = opr::RemoteRecv::make(
"x", *graph.get(), client, {cn1}, host_x->shape(), host_x->dtype(), "nccl");
auto func = graph->compile({{xr, {}}, make_callback_copy(y, host_y)});
func->execute();
MGB_ASSERT_TENSOR_EQ(*host_x, host_y);
}
TEST(TestOprIORemote, IdentityMultiThread) {
REQUIRE_GPU(2);
auto cns = load_multiple_xpus(2);
HostTensorGenerator<> gen;
auto host_x = gen({2, 3}, cns[1]);
HostTensorND host_x_get;
auto client = std::make_shared<test::MockGroupClient>();
auto sender = [&]() {
auto graph = ComputingGraph::make();
sys::set_thread_name("sender");
auto x = opr::Host2DeviceCopy::make(*graph, host_x),
xr = opr::RemoteSend::make("x", x, client, false, "nccl");
auto func = graph->compile({{xr, {}}});
func->execute();
};
auto receiver = [&]() {
sys::set_thread_name("receiver");
auto graph = ComputingGraph::make();
auto x = opr::RemoteRecv::make(
"x", *graph.get(), client, {cns[0]}, host_x->shape(), host_x->dtype(),
"nccl");
auto func = graph->compile({make_callback_copy(x, host_x_get)});
func->execute();
};
std::thread th_send(sender), th_recv(receiver);
th_send.join();
th_recv.join();
MGB_ASSERT_TENSOR_EQ(*host_x, host_x_get);
}
TEST(TestOprIORemote, IdentityWithGopt) {
REQUIRE_GPU(2);
auto cns = load_multiple_xpus(2);
HostTensorGenerator<> gen;
auto host_x = gen({2, 3}, cns[1]);
HostTensorND host_x_get;
auto client = std::make_shared<test::MockGroupClient>();
auto sender = [&]() {
sys::set_thread_name("sender");
auto graph = ComputingGraph::make();
auto x = opr::Host2DeviceCopy::make(*graph, host_x) * 2 + 1,
xr = opr::RemoteSend::make("x", x, client, false, "nccl");
auto func = graph->compile({{xr, {}}});
func->execute();
};
auto receiver = [&]() {
sys::set_thread_name("receiver");
auto graph = ComputingGraph::make();
auto x = opr::RemoteRecv::make(
"x", *graph.get(), client, {cns[0]}, host_x->shape(), host_x->dtype(),
"nccl");
auto func = graph->compile({make_callback_copy((x - 1) / 2, host_x_get)});
func->execute();
};
std::thread th_send(sender), th_recv(receiver);
th_send.join();
th_recv.join();
MGB_ASSERT_TENSOR_EQ(*host_x, host_x_get);
}
TEST(TestOprIORemote, APlusB) {
REQUIRE_GPU(2);
auto cns = load_multiple_xpus(2);
HostTensorGenerator<> gen;
auto host_x = gen({5, 7}, cns[0]), host_y = gen({5, 1}, cns[0]);
HostTensorND host_z;
auto client = std::make_shared<test::MockGroupClient>();
auto sender = [&]() {
auto graph = ComputingGraph::make();
auto z = opr::RemoteRecv::make(
"z", *graph.get(), client, {cns[0]}, host_x->shape(), host_x->dtype(),
"nccl");
auto x = opr::Host2DeviceCopy::make(*graph, host_x).rename("x"),
y = opr::Host2DeviceCopy::make(*graph, host_y).rename("y"),
xr = opr::RemoteSend::make("x", x, client, false, "nccl").rename("xr"),
yr = opr::RemoteSend::make("y", y, client, false, "nccl").rename("yr");
auto func = graph->compile({{xr, {}}, {yr, {}}, make_callback_copy(z, host_z)});
func->to_json()->writeto_fpath(output_file("TestOprIORemote.APlusB.json"));
func->execute();
};
auto receiver = [&]() {
auto graph = ComputingGraph::make();
auto x = opr::RemoteRecv::make(
"x", *graph.get(), client, {cns[1]}, host_x->shape(),
host_x->dtype(), "nccl"),
y = opr::RemoteRecv::make(
"y", *graph.get(), client, {cns[1]}, host_y->shape(),
host_y->dtype(), "nccl"),
z = x + y, zr = opr::RemoteSend::make("z", z, client, false, "nccl");
auto func = graph->compile({{zr, {}}});
func->execute();
};
std::thread th_send(sender), th_recv(receiver);
th_send.join();
th_recv.join();
ASSERT_EQ(host_x->shape(), host_z.shape());
auto px = host_x->ptr<float>(), py = host_y->ptr<float>(), pz = host_z.ptr<float>();
for (size_t i = 0; i < host_x->shape().total_nr_elems(); ++i) {
ASSERT_FLOAT_EQ(px[i] + py[i / host_x->shape(1)], pz[i]);
}
}
TEST(TestOprIORemote, SendGrad) {
REQUIRE_GPU(2);
auto cns = load_multiple_xpus(2);
HostTensorGenerator<> gen;
auto host_x = gen({2, 3}, cns[0]);
HostTensorND host_gx, host_loss;
auto client = std::make_shared<test::MockGroupClient>();
auto sender = [&]() {
sys::set_thread_name("sender");
auto graph = ComputingGraph::make();
auto x = opr::Host2DeviceCopy::make(*graph, host_x),
loss = opr::RemoteSend::make("loss", x, client, false, "nccl");
ASSERT_TRUE(
!loss.shape().ndim &&
loss.node()->contain_flag(VarNode::Flag::VOLATILE_CONTENT));
loss = opr::RemoteSend::make("loss", x, client, true, "nccl");
auto gx = cg::grad(loss, x);
set_priority(loss, 0);
set_priority(gx, 1);
auto func = graph->compile(
{make_callback_copy(gx, host_gx), make_callback_copy(loss, host_loss)});
auto on_opr = [&](cg::OperatorNodeBase* opr) {
mgb_log_warn("%s", opr->name().c_str());
return true;
};
func->iter_opr_seq(on_opr);
func->execute();
};
auto receiver = [&]() {
sys::set_thread_name("receiver");
auto graph = ComputingGraph::make();
auto x = opr::RemoteRecv::make(
"loss", *graph.get(), client, {cns[1]}, host_x->shape(),
host_x->dtype(), "nccl");
auto y = opr::RemoteSend::make("loss:grad", x + 1, client, false, "nccl");
auto func = graph->compile({{y, {}}});
func->execute();
};
std::thread th_send(sender), th_recv(receiver);
th_send.join();
th_recv.join();
ASSERT_EQ(host_x->shape(), host_gx.shape());
ASSERT_EQ(TensorShape{1}, host_loss.shape());
ASSERT_FLOAT_EQ(0.f, host_loss.ptr<float>()[0]);
auto px = host_x->ptr<float>(), pgx = host_gx.ptr<float>();
for (size_t i = 0; i < 6; ++i) {
MGB_ASSERT_FLOAT_EQ(px[i] + 1.f, pgx[i]);
}
}