#ifndef KOKKOS_PARALLELDATAMAP_HPP
#define KOKKOS_PARALLELDATAMAP_HPP
#include <utility>
#include <limits>
#include <iostream>
#include <sstream>
#include <stdexcept>
#include <Kokkos_Core.hpp>
#include <ParallelComm.hpp>
namespace Kokkos {
struct ParallelDataMap {
typedef View< unsigned*[2], HostSpace > host_recv_type ;
typedef View< unsigned*[2], HostSpace > host_send_type ;
typedef View< unsigned* , HostSpace > host_send_item_type ;
comm::Machine machine ;
host_recv_type host_recv ;
host_send_type host_send ;
host_send_item_type host_send_item ;
unsigned count_interior ;
unsigned count_send ;
unsigned count_owned ; unsigned count_receive ;
void assign( const unsigned arg_count_interior ,
const unsigned arg_count_owned ,
const unsigned arg_count_total ,
const unsigned arg_recv_msg ,
const unsigned arg_send_msg ,
const unsigned arg_send_count )
{
const std::string label("Kokkos::ParallelDataMap buffer");
count_interior = arg_count_interior ;
count_owned = arg_count_owned ;
count_send = arg_count_owned - arg_count_interior ;
count_receive = arg_count_total - arg_count_owned ;
host_recv = host_recv_type( label , arg_recv_msg );
host_send = host_send_type( label , arg_send_msg );
host_send_item = host_send_item_type( label , arg_send_count );
}
};
template< class ArrayType , class Rank = void >
struct PackArray ;
template< typename DeviceType, typename ValueType >
struct PackArray< View< ValueType* , DeviceType > , void >
{
typedef DeviceType execution_space ;
typedef typename DeviceType::size_type size_type ;
typedef View< ValueType* , execution_space > array_type ;
typedef View< ValueType* , execution_space > buffer_type ;
private:
buffer_type output ;
array_type input ;
size_type base ;
public:
KOKKOS_INLINE_FUNCTION
void operator()( const size_type i ) const
{ output[i] = input(base+i); }
inline
static
void pack( const buffer_type & arg_output ,
const size_type arg_begin ,
const size_type arg_count ,
const array_type & arg_input )
{
PackArray op ;
op.output = arg_output ;
op.input = arg_input ;
op.base = arg_begin ;
parallel_for( arg_count , op );
}
};
template< typename DeviceType, typename ValueType , unsigned N1 >
struct PackArray< View< ValueType*[N1] , DeviceType > , void >
{
typedef DeviceType execution_space ;
typedef typename DeviceType::size_type size_type ;
typedef View< ValueType*[N1] , execution_space > array_type ;
typedef View< ValueType* , execution_space > buffer_type ;
private:
buffer_type output ;
array_type input ;
size_type base ;
public:
KOKKOS_INLINE_FUNCTION
void operator()( const size_type i ) const
{
for ( size_type j = 0 , k = i * N1 ; j < N1 ; ++j , ++k ) {
output[k] = input(base+i,j);
}
}
inline static
void pack( const buffer_type & arg_output ,
const size_type arg_begin ,
const size_type arg_count ,
const array_type & arg_input )
{
if ( arg_count ) {
PackArray op ;
op.output = arg_output ;
op.input = arg_input ;
op.base = arg_begin ;
parallel_for( arg_count , op );
}
}
};
template< class ArrayType , class Rank = void > struct UnpackArray ;
template< typename DeviceType, typename ValueType >
struct UnpackArray< View< ValueType* , DeviceType > , void >
{
typedef DeviceType execution_space ;
typedef typename DeviceType::size_type size_type ;
typedef View< ValueType* , execution_space > array_type ;
typedef View< ValueType* , execution_space > buffer_type ;
private:
array_type output ;
buffer_type input ;
size_type base ;
public:
KOKKOS_INLINE_FUNCTION
void operator()( const size_type i ) const
{ output(base+i) = input[i]; }
inline
static
void unpack( const array_type & arg_output ,
const buffer_type & arg_input ,
const size_type arg_begin ,
const size_type arg_count )
{
UnpackArray op ;
op.output = arg_output ;
op.input = arg_input ;
op.base = arg_begin ;
parallel_for( arg_count , op );
}
};
template< typename DeviceType, typename ValueType , unsigned N1 >
struct UnpackArray< View< ValueType*[N1] , DeviceType > , void >
{
typedef DeviceType execution_space ;
typedef typename DeviceType::size_type size_type ;
typedef View< ValueType* , execution_space > buffer_type ;
typedef View< ValueType*[N1] , execution_space > array_type ;
private:
array_type output ;
buffer_type input ;
size_type base ;
public:
KOKKOS_INLINE_FUNCTION
void operator()( const size_type i ) const
{
for ( size_type j = 0 , k = i * N1 ; j < N1 ; ++j , ++k ) {
output(base+i,j) = input(k);
}
}
inline
static
void unpack( const array_type & arg_output ,
const buffer_type & arg_input ,
const size_type arg_begin ,
const size_type arg_count )
{
if ( arg_count ) {
UnpackArray op ;
op.output = arg_output ;
op.input = arg_input ;
op.base = arg_begin ;
parallel_for( arg_count , op );
}
}
};
template< class ValueType , class Device , class DataMap >
class AsyncExchange ;
}
#ifdef KOKKOS_ENABLE_MPI
namespace Kokkos {
template< class ValueType , class Device >
class AsyncExchange< ValueType, Device , Kokkos::ParallelDataMap > {
public:
typedef Device execution_space ;
typedef Kokkos::ParallelDataMap data_map_type ;
typedef Kokkos::View< ValueType* , execution_space > buffer_dev_type ;
typedef typename buffer_dev_type::HostMirror buffer_host_type ;
private:
static const int mpi_tag = 11 ;
const data_map_type data_map ;
unsigned chunk_size ;
unsigned send_count_max ;
buffer_host_type host_recv_buffer ;
buffer_host_type host_send_buffer ;
buffer_host_type send_msg_buffer ;
buffer_dev_type dev_buffer ;
buffer_dev_type dev_send_buffer ; buffer_dev_type dev_recv_buffer ; std::vector< MPI_Request > recv_request ;
public:
const buffer_dev_type & buffer() const { return dev_buffer ; }
AsyncExchange( const data_map_type & arg_data_map ,
const size_t arg_chunk )
: data_map( arg_data_map )
, chunk_size( arg_chunk )
, send_count_max( 0 )
, host_recv_buffer()
, host_send_buffer()
, send_msg_buffer()
, dev_buffer()
, dev_send_buffer()
, dev_recv_buffer()
, recv_request()
{
const size_t send_msg_count = arg_data_map.host_send.dimension_0();
const size_t recv_msg_count = arg_data_map.host_recv.dimension_0();
const size_t send_msg_length = arg_chunk * arg_data_map.count_send ;
const size_t recv_msg_length = arg_chunk * arg_data_map.count_receive ;
for ( size_t i = 0 ; i < send_msg_count ; ++i ) {
send_count_max = std::max( send_count_max ,
(unsigned) arg_data_map.host_send(i,1) );
}
dev_buffer = buffer_dev_type(
std::string("AsyncExchange dev_buffer") ,
std::max( send_msg_length , recv_msg_length ) );
dev_send_buffer =
Kokkos::subview( dev_buffer , std::pair<size_t,size_t>( 0 , send_msg_length ) );
dev_recv_buffer =
Kokkos::subview( dev_buffer , std::pair<size_t,size_t>( 0 , recv_msg_length ) );
host_recv_buffer = buffer_host_type(
std::string("AsyncExchange host_recv_buffer") ,
recv_msg_length );
host_send_buffer = buffer_host_type(
std::string("AsyncExchange host_send_buffer") ,
send_msg_length );
send_msg_buffer = buffer_host_type(
std::string("AsyncExchange send_msg_buffer") ,
arg_chunk * send_count_max );
recv_request.assign( recv_msg_count , MPI_REQUEST_NULL );
}
void setup()
{
{ const size_t recv_msg_count = data_map.host_recv.dimension_0();
ValueType * ptr = host_recv_buffer.ptr_on_device();
for ( size_t i = 0 ; i < recv_msg_count ; ++i ) {
const int proc = data_map.host_recv(i,0);
const int count = data_map.host_recv(i,1) * chunk_size ;
MPI_Irecv( ptr , count * sizeof(ValueType) , MPI_BYTE ,
proc , mpi_tag , data_map.machine.mpi_comm ,
& recv_request[i] );
ptr += count ;
}
}
Kokkos::deep_copy( host_send_buffer , dev_send_buffer );
}
void send_receive()
{
const size_t recv_msg_count = data_map.host_recv.dimension_0();
const size_t send_msg_count = data_map.host_send.dimension_0();
for ( size_t i = 0 , j = 0 ; i < send_msg_count ; ++i ) {
const int proc = data_map.host_send(i,0);
const int count = data_map.host_send(i,1);
for ( int k = 0 , km = 0 ; k < count ; ++k , ++j ) {
const int km_end = km + chunk_size ;
int ki = chunk_size * data_map.host_send_item(j);
for ( ; km < km_end ; ++km , ++ki ) {
send_msg_buffer[km] = host_send_buffer[ki];
}
}
MPI_Ssend( send_msg_buffer.ptr_on_device(),
count * chunk_size * sizeof(ValueType) , MPI_BYTE ,
proc , mpi_tag , data_map.machine.mpi_comm );
}
for ( size_t i = 0 ; i < recv_msg_count ; ++i ) {
MPI_Status recv_status ;
int recv_which = 0 ;
int recv_size = 0 ;
MPI_Waitany( recv_msg_count , & recv_request[0] ,
& recv_which , & recv_status );
const int recv_proc = recv_status.MPI_SOURCE ;
MPI_Get_count( & recv_status , MPI_BYTE , & recv_size );
const int expected_proc = data_map.host_recv(recv_which,0);
const int expected_size = data_map.host_recv(recv_which,1) *
chunk_size * sizeof(ValueType);
if ( ( expected_proc != recv_proc ) ||
( expected_size != recv_size ) ) {
std::ostringstream msg ;
msg << "AsyncExchange error:"
<< " P" << comm::rank( data_map.machine )
<< " received from P" << recv_proc
<< " size " << recv_size
<< " expected " << expected_size
<< " from P" << expected_proc ;
throw std::runtime_error( msg.str() );
}
}
Kokkos::deep_copy( dev_recv_buffer , host_recv_buffer );
}
};
}
#else
namespace Kokkos {
template< class ValueType , class Device >
class AsyncExchange< ValueType, Device , Kokkos::ParallelDataMap > {
public:
typedef Device execution_space ;
typedef Kokkos::ParallelDataMap data_map_type ;
typedef Kokkos::View< ValueType* , execution_space > buffer_dev_type ;
typedef typename buffer_dev_type::HostMirror buffer_host_type ;
buffer_dev_type dev_buffer ;
public:
const buffer_dev_type & buffer() const { return dev_buffer ; }
AsyncExchange( const data_map_type & , const size_t )
: dev_buffer()
{ }
void setup() { }
void send_receive() { }
};
}
#endif
#endif