#include "H5Dmodule.h"
#include "H5private.h"
#include "H5CXprivate.h"
#include "H5Dpkg.h"
#include "H5Eprivate.h"
#include "H5Fprivate.h"
#include "H5FDprivate.h"
#include "H5Iprivate.h"
#include "H5MMprivate.h"
#include "H5Oprivate.h"
#include "H5Pprivate.h"
#include "H5Sprivate.h"
#include "H5VMprivate.h"
#ifdef H5_HAVE_PARALLEL
#define H5D_ONE_LINK_CHUNK_IO 0
#define H5D_MULTI_CHUNK_IO 1
#define H5D_ONE_LINK_CHUNK_IO_MORE_OPT 2
#define H5D_MULTI_CHUNK_IO_MORE_OPT 3
#define H5D_OBTAIN_ONE_CHUNK_ADDR_IND 0
#define H5D_OBTAIN_ALL_CHUNK_ADDR_COL 2
#define H5D_ALL_CHUNK_ADDR_THRES_COL 30
#define H5D_ALL_CHUNK_ADDR_THRES_COL_NUM 10000
#define H5D_CHUNK_IO_MODE_COL 1
#define H5D_CHUNK_SELECT_REG 1
typedef struct H5D_chunk_addr_info_t {
haddr_t chunk_addr;
H5D_chunk_info_t chunk_info;
} H5D_chunk_addr_info_t;
typedef enum H5D_mpio_no_rank0_bcast_cause_t {
H5D_MPIO_RANK0_BCAST = 0x00,
H5D_MPIO_RANK0_NOT_H5S_ALL = 0x01,
H5D_MPIO_RANK0_NOT_CONTIGUOUS = 0x02,
H5D_MPIO_RANK0_NOT_FIXED_SIZE = 0x04,
H5D_MPIO_RANK0_GREATER_THAN_2GB = 0x08
} H5D_mpio_no_rank0_bcast_cause_t;
typedef struct H5D_filtered_collective_io_info_t {
hsize_t index;
hsize_t scaled[H5O_LAYOUT_NDIMS];
hbool_t full_overwrite;
size_t num_writers;
size_t io_size;
void *buf;
struct {
H5F_block_t chunk_current;
H5F_block_t new_chunk;
} chunk_states;
struct {
int original_owner;
int new_owner;
} owners;
struct {
MPI_Request *receive_requests_array;
unsigned char **receive_buffer_array;
int num_receive_requests;
} async_info;
} H5D_filtered_collective_io_info_t;
static herr_t H5D__chunk_collective_io(H5D_io_info_t *io_info,
const H5D_type_info_t *type_info, H5D_chunk_map_t *fm);
static herr_t H5D__multi_chunk_collective_io(H5D_io_info_t *io_info,
const H5D_type_info_t *type_info, H5D_chunk_map_t *fm);
static herr_t H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info,
const H5D_type_info_t *type_info, H5D_chunk_map_t *fm);
static herr_t H5D__link_chunk_collective_io(H5D_io_info_t *io_info,
const H5D_type_info_t *type_info, H5D_chunk_map_t *fm, int sum_chunk);
static herr_t H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info,
const H5D_type_info_t *type_info, H5D_chunk_map_t *fm);
static herr_t H5D__inter_collective_io(H5D_io_info_t *io_info,
const H5D_type_info_t *type_info, const H5S_t *file_space,
const H5S_t *mem_space);
static herr_t H5D__final_collective_io(H5D_io_info_t *io_info,
const H5D_type_info_t *type_info, hsize_t nelmts, MPI_Datatype mpi_file_type,
MPI_Datatype mpi_buf_type);
static herr_t H5D__sort_chunk(H5D_io_info_t *io_info, const H5D_chunk_map_t *fm,
H5D_chunk_addr_info_t chunk_addr_info_array[], int many_chunk_opt);
static herr_t H5D__obtain_mpio_mode(H5D_io_info_t *io_info, H5D_chunk_map_t *fm,
uint8_t assign_io_mode[], haddr_t chunk_addr[]);
static herr_t H5D__mpio_get_sum_chunk(const H5D_io_info_t *io_info,
const H5D_chunk_map_t *fm, int *sum_chunkf);
static herr_t H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info,
const H5D_type_info_t *type_info, const H5D_chunk_map_t *fm,
H5D_filtered_collective_io_info_t **chunk_list, size_t *num_entries);
#if MPI_VERSION >= 3
static herr_t H5D__chunk_redistribute_shared_chunks(const H5D_io_info_t *io_info,
const H5D_type_info_t *type_info, const H5D_chunk_map_t *fm,
H5D_filtered_collective_io_info_t *local_chunk_array, size_t *local_chunk_array_num_entries);
#endif
static herr_t H5D__mpio_array_gatherv(void *local_array, size_t local_array_num_entries,
size_t array_entry_size, void **gathered_array, size_t *gathered_array_num_entries,
hbool_t allgather, int root, MPI_Comm comm, int (*sort_func)(const void *, const void *));
static herr_t H5D__mpio_filtered_collective_write_type(
H5D_filtered_collective_io_info_t *chunk_list, size_t num_entries,
MPI_Datatype *new_mem_type, hbool_t *mem_type_derived,
MPI_Datatype *new_file_type, hbool_t *file_type_derived);
static herr_t H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk_entry,
const H5D_io_info_t *io_info, const H5D_type_info_t *type_info, const H5D_chunk_map_t *fm);
static int H5D__cmp_chunk_addr(const void *chunk_addr_info1, const void *chunk_addr_info2);
static int H5D__cmp_filtered_collective_io_info_entry(const void *filtered_collective_io_info_entry1,
const void *filtered_collective_io_info_entry2);
#if MPI_VERSION >= 3
static int H5D__cmp_filtered_collective_io_info_entry_owner(const void *filtered_collective_io_info_entry1,
const void *filtered_collective_io_info_entry2);
#endif
htri_t
H5D__mpio_opt_possible(const H5D_io_info_t *io_info, const H5S_t *file_space,
const H5S_t *mem_space, const H5D_type_info_t *type_info)
{
H5FD_mpio_xfer_t io_xfer_mode;
unsigned local_cause[2] = {0,0};
unsigned global_cause[2] = {0,0};
htri_t is_vl_storage;
htri_t ret_value = SUCCEED;
FUNC_ENTER_PACKAGE
HDassert(io_info);
HDassert(mem_space);
HDassert(file_space);
HDassert(type_info);
if(H5CX_get_io_xfer_mode(&io_xfer_mode) < 0)
local_cause[0] |= H5D_MPIO_ERROR_WHILE_CHECKING_COLLECTIVE_POSSIBLE;
if(io_xfer_mode == H5FD_MPIO_INDEPENDENT)
local_cause[0] |= H5D_MPIO_SET_INDEPENDENT;
if(!H5FD_mpi_opt_types_g)
local_cause[0] |= H5D_MPIO_MPI_OPT_TYPES_ENV_VAR_DISABLED;
if(!type_info->is_conv_noop)
local_cause[0] |= H5D_MPIO_DATATYPE_CONVERSION;
if(!type_info->is_xform_noop)
local_cause[0] |= H5D_MPIO_DATA_TRANSFORMS;
if(!((H5S_SIMPLE == H5S_GET_EXTENT_TYPE(mem_space) || H5S_SCALAR == H5S_GET_EXTENT_TYPE(mem_space))
&& (H5S_SIMPLE == H5S_GET_EXTENT_TYPE(file_space) || H5S_SCALAR == H5S_GET_EXTENT_TYPE(file_space))))
local_cause[0] |= H5D_MPIO_NOT_SIMPLE_OR_SCALAR_DATASPACES;
if(!(io_info->dset->shared->layout.type == H5D_CONTIGUOUS ||
io_info->dset->shared->layout.type == H5D_CHUNKED))
local_cause[0] |= H5D_MPIO_NOT_CONTIGUOUS_OR_CHUNKED_DATASET;
if(io_info->dset->shared->dcpl_cache.efl.nused > 0)
local_cause[0] |= H5D_MPIO_NOT_CONTIGUOUS_OR_CHUNKED_DATASET;
#if MPI_VERSION < 3
if(io_info->op_type == H5D_IO_OP_WRITE &&
io_info->dset->shared->layout.type == H5D_CHUNKED &&
io_info->dset->shared->dcpl_cache.pline.nused > 0)
local_cause[0] |= H5D_MPIO_PARALLEL_FILTERED_WRITES_DISABLED;
#endif
if(H5S_GET_SELECT_TYPE(file_space) != H5S_SEL_ALL)
local_cause[1] |= H5D_MPIO_RANK0_NOT_H5S_ALL;
else if(H5D_CONTIGUOUS != io_info->dset->shared->layout.type)
local_cause[1] |= H5D_MPIO_RANK0_NOT_CONTIGUOUS;
else if((is_vl_storage = H5T_is_vl_storage(type_info->dset_type)) < 0)
local_cause[0] |= H5D_MPIO_ERROR_WHILE_CHECKING_COLLECTIVE_POSSIBLE;
else if(is_vl_storage)
local_cause[1] |= H5D_MPIO_RANK0_NOT_FIXED_SIZE;
else {
size_t type_size;
if(0 == (type_size = H5T_GET_SIZE(type_info->dset_type)))
local_cause[0] |= H5D_MPIO_ERROR_WHILE_CHECKING_COLLECTIVE_POSSIBLE;
else {
hssize_t snelmts;
if((snelmts = H5S_GET_EXTENT_NPOINTS(file_space)) < 0)
local_cause[0] |= H5D_MPIO_ERROR_WHILE_CHECKING_COLLECTIVE_POSSIBLE;
else {
hsize_t dset_size;
dset_size = ((hsize_t)snelmts) * type_size;
if(dset_size > ((hsize_t)(2.0F * H5_GB) - 1))
local_cause[1] |= H5D_MPIO_RANK0_GREATER_THAN_2GB;
}
}
}
if(local_cause[0] & H5D_MPIO_SET_INDEPENDENT)
global_cause[0] = local_cause[0];
else {
int mpi_code;
if(MPI_SUCCESS != (mpi_code = MPI_Allreduce(&local_cause, &global_cause, 2, MPI_UNSIGNED, MPI_BOR, io_info->comm)))
HMPI_GOTO_ERROR(FAIL, "MPI_Allreduce failed", mpi_code)
}
H5CX_set_mpio_local_no_coll_cause(local_cause[0]);
H5CX_set_mpio_global_no_coll_cause(global_cause[0]);
if(global_cause[0] == 0 && global_cause[1] == 0) {
H5CX_set_mpio_rank0_bcast(TRUE);
#ifdef H5_HAVE_INSTRUMENTED_LIBRARY
H5CX_test_set_mpio_coll_rank0_bcast(TRUE);
#endif
}
ret_value = global_cause[0] > 0 ? FALSE : TRUE;
done:
FUNC_LEAVE_NOAPI(ret_value)
}
herr_t
H5D__mpio_select_read(const H5D_io_info_t *io_info, const H5D_type_info_t H5_ATTR_UNUSED *type_info,
hsize_t mpi_buf_count, const H5S_t H5_ATTR_UNUSED *file_space, const H5S_t H5_ATTR_UNUSED *mem_space)
{
const H5D_contig_storage_t *store_contig = &(io_info->store->contig);
herr_t ret_value = SUCCEED;
FUNC_ENTER_PACKAGE
H5_CHECK_OVERFLOW(mpi_buf_count, hsize_t, size_t);
if(H5F_block_read(io_info->dset->oloc.file, H5FD_MEM_DRAW, store_contig->dset_addr, (size_t)mpi_buf_count, io_info->u.rbuf) < 0)
HGOTO_ERROR(H5E_IO, H5E_READERROR, FAIL, "can't finish collective parallel read")
done:
FUNC_LEAVE_NOAPI(ret_value)
}
herr_t
H5D__mpio_select_write(const H5D_io_info_t *io_info, const H5D_type_info_t H5_ATTR_UNUSED *type_info,
hsize_t mpi_buf_count, const H5S_t H5_ATTR_UNUSED *file_space, const H5S_t H5_ATTR_UNUSED *mem_space)
{
const H5D_contig_storage_t *store_contig = &(io_info->store->contig);
herr_t ret_value = SUCCEED;
FUNC_ENTER_PACKAGE
H5_CHECK_OVERFLOW(mpi_buf_count, hsize_t, size_t);
if(H5F_block_write(io_info->dset->oloc.file, H5FD_MEM_DRAW, store_contig->dset_addr, (size_t)mpi_buf_count, io_info->u.wbuf) < 0)
HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "can't finish collective parallel write")
done:
FUNC_LEAVE_NOAPI(ret_value)
}
static herr_t
H5D__mpio_array_gatherv(void *local_array, size_t local_array_num_entries,
size_t array_entry_size, void **_gathered_array, size_t *_gathered_array_num_entries,
hbool_t allgather, int root, MPI_Comm comm, int (*sort_func)(const void *, const void *))
{
size_t gathered_array_num_entries = 0;
void *gathered_array = NULL;
int *receive_counts_array = NULL;
int *displacements_array = NULL;
int mpi_code, mpi_rank, mpi_size;
int sendcount;
herr_t ret_value = SUCCEED;
FUNC_ENTER_STATIC
HDassert(_gathered_array);
HDassert(_gathered_array_num_entries);
MPI_Comm_size(comm, &mpi_size);
MPI_Comm_rank(comm, &mpi_rank);
if (MPI_SUCCESS != (mpi_code = MPI_Allreduce(&local_array_num_entries, &gathered_array_num_entries, 1, MPI_INT, MPI_SUM, comm)))
HMPI_GOTO_ERROR(FAIL, "MPI_Allreduce failed", mpi_code)
if (gathered_array_num_entries > 0) {
if (allgather || (mpi_rank == root)) {
if (NULL == (gathered_array = H5MM_malloc(gathered_array_num_entries * array_entry_size)))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate gathered array")
if (NULL == (receive_counts_array = (int *) H5MM_malloc((size_t) mpi_size * sizeof(int))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate receive counts array")
if (NULL == (displacements_array = (int *) H5MM_malloc((size_t) mpi_size * sizeof(int))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate receive displacements array")
}
if (allgather) {
if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&local_array_num_entries, 1, MPI_INT, receive_counts_array, 1, MPI_INT, comm)))
HMPI_GOTO_ERROR(FAIL, "MPI_Allgather failed", mpi_code)
}
else {
if (MPI_SUCCESS != (mpi_code = MPI_Gather(&local_array_num_entries, 1, MPI_INT, receive_counts_array, 1, MPI_INT, root, comm)))
HMPI_GOTO_ERROR(FAIL, "MPI_Gather failed", mpi_code)
}
if (allgather || (mpi_rank == root)) {
size_t i;
for (i = 0; i < (size_t) mpi_size; i++)
H5_CHECKED_ASSIGN(receive_counts_array[i], int, (size_t) receive_counts_array[i] * array_entry_size, size_t);
displacements_array[0] = 0;
for (i = 1; i < (size_t) mpi_size; i++)
displacements_array[i] = displacements_array[i - 1] + receive_counts_array[i - 1];
}
H5_CHECKED_ASSIGN(sendcount, int, local_array_num_entries * array_entry_size, size_t);
if (allgather) {
if (MPI_SUCCESS != (mpi_code = MPI_Allgatherv(local_array, sendcount, MPI_BYTE,
gathered_array, receive_counts_array, displacements_array, MPI_BYTE, comm)))
HMPI_GOTO_ERROR(FAIL, "MPI_Allgatherv failed", mpi_code)
}
else {
if (MPI_SUCCESS != (mpi_code = MPI_Gatherv(local_array, sendcount, MPI_BYTE,
gathered_array, receive_counts_array, displacements_array, MPI_BYTE, root, comm)))
HMPI_GOTO_ERROR(FAIL, "MPI_Gatherv failed", mpi_code)
}
if (sort_func && (allgather || (mpi_rank == root)))
HDqsort(gathered_array, gathered_array_num_entries, array_entry_size, sort_func);
}
*_gathered_array = gathered_array;
*_gathered_array_num_entries = gathered_array_num_entries;
done:
if (receive_counts_array)
H5MM_free(receive_counts_array);
if (displacements_array)
H5MM_free(displacements_array);
FUNC_LEAVE_NOAPI(ret_value)
}
static herr_t
H5D__mpio_get_sum_chunk(const H5D_io_info_t *io_info, const H5D_chunk_map_t *fm,
int *sum_chunkf)
{
int num_chunkf;
size_t ori_num_chunkf;
int mpi_code;
herr_t ret_value = SUCCEED;
FUNC_ENTER_STATIC
num_chunkf = 0;
ori_num_chunkf = H5SL_count(fm->sel_chunks);
H5_CHECKED_ASSIGN(num_chunkf, int, ori_num_chunkf, size_t);
if(MPI_SUCCESS != (mpi_code = MPI_Allreduce(&num_chunkf, sum_chunkf, 1, MPI_INT, MPI_SUM, io_info->comm)))
HMPI_GOTO_ERROR(FAIL, "MPI_Allreduce failed", mpi_code)
done:
FUNC_LEAVE_NOAPI(ret_value)
}
herr_t
H5D__contig_collective_read(H5D_io_info_t *io_info, const H5D_type_info_t *type_info,
hsize_t H5_ATTR_UNUSED nelmts, const H5S_t *file_space, const H5S_t *mem_space,
H5D_chunk_map_t H5_ATTR_UNUSED *fm)
{
H5D_mpio_actual_io_mode_t actual_io_mode = H5D_MPIO_CONTIGUOUS_COLLECTIVE;
herr_t ret_value = SUCCEED;
FUNC_ENTER_PACKAGE
HDassert(H5FD_MPIO == H5F_DRIVER_ID(io_info->dset->oloc.file));
if(H5D__inter_collective_io(io_info, type_info, file_space, mem_space) < 0)
HGOTO_ERROR(H5E_IO, H5E_READERROR, FAIL, "couldn't finish shared collective MPI-IO")
H5CX_set_mpio_actual_io_mode(actual_io_mode);
done:
FUNC_LEAVE_NOAPI(ret_value)
}
herr_t
H5D__contig_collective_write(H5D_io_info_t *io_info, const H5D_type_info_t *type_info,
hsize_t H5_ATTR_UNUSED nelmts, const H5S_t *file_space, const H5S_t *mem_space,
H5D_chunk_map_t H5_ATTR_UNUSED *fm)
{
H5D_mpio_actual_io_mode_t actual_io_mode = H5D_MPIO_CONTIGUOUS_COLLECTIVE;
herr_t ret_value = SUCCEED;
FUNC_ENTER_PACKAGE
HDassert(H5FD_MPIO == H5F_DRIVER_ID(io_info->dset->oloc.file));
if(H5D__inter_collective_io(io_info, type_info, file_space, mem_space) < 0)
HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "couldn't finish shared collective MPI-IO")
H5CX_set_mpio_actual_io_mode(actual_io_mode);
done:
FUNC_LEAVE_NOAPI(ret_value)
}
static herr_t
H5D__chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info,
H5D_chunk_map_t *fm)
{
H5FD_mpio_chunk_opt_t chunk_opt_mode;
int io_option = H5D_MULTI_CHUNK_IO_MORE_OPT;
int sum_chunk = -1;
#ifdef H5_HAVE_INSTRUMENTED_LIBRARY
htri_t temp_not_link_io = FALSE;
#endif
herr_t ret_value = SUCCEED;
FUNC_ENTER_STATIC
HDassert(io_info);
HDassert(io_info->using_mpi_vfd);
HDassert(type_info);
HDassert(fm);
H5CX_set_coll_metadata_read(FALSE);
if(H5CX_get_mpio_chunk_opt_mode(&chunk_opt_mode) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "couldn't get chunk optimization option")
if(H5FD_MPIO_CHUNK_ONE_IO == chunk_opt_mode)
io_option = H5D_ONE_LINK_CHUNK_IO;
else if(H5FD_MPIO_CHUNK_MULTI_IO == chunk_opt_mode)
io_option = H5D_MULTI_CHUNK_IO;
else {
unsigned one_link_chunk_io_threshold;
int mpi_size;
if(H5D__mpio_get_sum_chunk(io_info, fm, &sum_chunk) < 0)
HGOTO_ERROR(H5E_DATASPACE, H5E_CANTSWAP, FAIL, "unable to obtain the total chunk number of all processes");
if((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0)
HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size")
if(H5CX_get_mpio_chunk_opt_num(&one_link_chunk_io_threshold) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "couldn't get chunk optimization option threshold value")
if((unsigned)sum_chunk / (unsigned)mpi_size >= one_link_chunk_io_threshold)
io_option = H5D_ONE_LINK_CHUNK_IO_MORE_OPT;
#ifdef H5_HAVE_INSTRUMENTED_LIBRARY
else
temp_not_link_io = TRUE;
#endif
}
#ifdef H5_HAVE_INSTRUMENTED_LIBRARY
{
if(H5D_ONE_LINK_CHUNK_IO == io_option) {
if(H5CX_test_set_mpio_coll_chunk_link_hard(0) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "unable to set property value")
}
else if(H5D_MULTI_CHUNK_IO == io_option) {
if(H5CX_test_set_mpio_coll_chunk_multi_hard(0) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "unable to set property value")
}
else if(H5D_ONE_LINK_CHUNK_IO_MORE_OPT == io_option) {
if(H5CX_test_set_mpio_coll_chunk_link_num_true(0) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "unable to set property value")
}
else if(temp_not_link_io) {
if(H5CX_test_set_mpio_coll_chunk_link_num_false(0) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "unable to set property value")
}
}
#endif
switch (io_option) {
case H5D_ONE_LINK_CHUNK_IO:
case H5D_ONE_LINK_CHUNK_IO_MORE_OPT:
if(io_info->dset->shared->dcpl_cache.pline.nused > 0) {
if(io_info->op_type == H5D_IO_OP_READ) {
if(H5D__multi_chunk_filtered_collective_io(io_info, type_info, fm) < 0)
HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple filtered chunk MPI-IO")
}
else
if(H5D__link_chunk_filtered_collective_io(io_info, type_info, fm) < 0)
HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish filtered linked chunk MPI-IO")
}
else
if(H5D__link_chunk_collective_io(io_info, type_info, fm, sum_chunk) < 0)
HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish linked chunk MPI-IO")
break;
case H5D_MULTI_CHUNK_IO:
default:
if(io_info->dset->shared->dcpl_cache.pline.nused > 0) {
if(H5D__multi_chunk_filtered_collective_io(io_info, type_info, fm) < 0)
HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple filtered chunk MPI-IO")
}
else
if(H5D__multi_chunk_collective_io(io_info, type_info, fm) < 0)
HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple chunk MPI-IO")
break;
}
done:
FUNC_LEAVE_NOAPI(ret_value)
}
herr_t
H5D__chunk_collective_read(H5D_io_info_t *io_info, const H5D_type_info_t *type_info,
hsize_t H5_ATTR_UNUSED nelmts, const H5S_t H5_ATTR_UNUSED *file_space, const H5S_t H5_ATTR_UNUSED *mem_space,
H5D_chunk_map_t *fm)
{
herr_t ret_value = SUCCEED;
FUNC_ENTER_PACKAGE
if(H5D__chunk_collective_io(io_info, type_info, fm) < 0)
HGOTO_ERROR(H5E_DATASPACE, H5E_READERROR, FAIL, "read error")
done:
FUNC_LEAVE_NOAPI(ret_value)
}
herr_t
H5D__chunk_collective_write(H5D_io_info_t *io_info, const H5D_type_info_t *type_info,
hsize_t H5_ATTR_UNUSED nelmts, const H5S_t H5_ATTR_UNUSED *file_space, const H5S_t H5_ATTR_UNUSED *mem_space,
H5D_chunk_map_t *fm)
{
herr_t ret_value = SUCCEED;
FUNC_ENTER_PACKAGE
if(H5D__chunk_collective_io(io_info, type_info, fm) < 0)
HGOTO_ERROR(H5E_DATASPACE, H5E_WRITEERROR, FAIL, "write error")
done:
FUNC_LEAVE_NOAPI(ret_value)
}
static herr_t
H5D__link_chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info,
H5D_chunk_map_t *fm, int sum_chunk)
{
H5D_chunk_addr_info_t *chunk_addr_info_array = NULL;
MPI_Datatype chunk_final_mtype;
hbool_t chunk_final_mtype_is_derived = FALSE;
MPI_Datatype chunk_final_ftype;
hbool_t chunk_final_ftype_is_derived = FALSE;
H5D_storage_t ctg_store;
size_t total_chunks;
MPI_Datatype *chunk_mtype = NULL;
MPI_Datatype *chunk_ftype = NULL;
MPI_Aint *chunk_disp_array = NULL;
MPI_Aint *chunk_mem_disp_array = NULL;
hbool_t *chunk_mft_is_derived_array = NULL;
hbool_t *chunk_mbt_is_derived_array = NULL;
int *chunk_mpi_file_counts = NULL;
int *chunk_mpi_mem_counts = NULL;
int mpi_code;
herr_t ret_value = SUCCEED;
FUNC_ENTER_STATIC
H5CX_set_mpio_actual_chunk_opt(H5D_MPIO_LINK_CHUNK);
H5CX_set_mpio_actual_io_mode(H5D_MPIO_CHUNK_COLLECTIVE);
if(sum_chunk < 0) {
if(H5D__mpio_get_sum_chunk(io_info, fm, &sum_chunk) < 0)
HGOTO_ERROR(H5E_DATASPACE, H5E_CANTSWAP, FAIL, "unable to obtain the total chunk number of all processes");
}
H5_CHECKED_ASSIGN(total_chunks, size_t, fm->layout->u.chunk.nchunks, hsize_t);
if(total_chunks == 1) {
H5SL_node_t *chunk_node;
H5S_t *fspace;
H5S_t *mspace;
chunk_node = H5SL_first(fm->sel_chunks);
if(chunk_node == NULL) {
fspace = mspace = NULL;
ctg_store.contig.dset_addr = 0;
}
else {
H5D_chunk_ud_t udata;
H5D_chunk_info_t *chunk_info;
if(NULL == (chunk_info = (H5D_chunk_info_t *)H5SL_item(chunk_node)))
HGOTO_ERROR(H5E_STORAGE, H5E_CANTGET, FAIL, "couldn't get chunk info from skip list")
fspace = chunk_info->fspace;
mspace = chunk_info->mspace;
if(H5D__chunk_lookup(io_info->dset, chunk_info->scaled, &udata) < 0)
HGOTO_ERROR(H5E_STORAGE, H5E_CANTGET, FAIL, "couldn't get chunk address")
ctg_store.contig.dset_addr = udata.chunk_block.offset;
}
io_info->store = &ctg_store;
#ifdef H5D_DEBUG
if(H5DEBUG(D))
HDfprintf(H5DEBUG(D),"before inter_collective_io for total chunk = 1 \n");
#endif
if(H5D__inter_collective_io(io_info, type_info, fspace, mspace) < 0)
HGOTO_ERROR(H5E_STORAGE, H5E_CANTGET, FAIL, "couldn't finish shared collective MPI-IO")
}
else {
hsize_t mpi_buf_count;
size_t num_chunk;
size_t u;
num_chunk = H5SL_count(fm->sel_chunks);
H5_CHECK_OVERFLOW(num_chunk, size_t, int);
#ifdef H5D_DEBUG
if(H5DEBUG(D))
HDfprintf(H5DEBUG(D),"total_chunks = %Zu, num_chunk = %Zu\n", total_chunks, num_chunk);
#endif
if(num_chunk) {
if(NULL == (chunk_addr_info_array = (H5D_chunk_addr_info_t *)H5MM_malloc(num_chunk * sizeof(H5D_chunk_addr_info_t))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk array buffer")
if(NULL == (chunk_mtype = (MPI_Datatype *)H5MM_malloc(num_chunk * sizeof(MPI_Datatype))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk memory datatype buffer")
if(NULL == (chunk_ftype = (MPI_Datatype *)H5MM_malloc(num_chunk * sizeof(MPI_Datatype))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk file datatype buffer")
if(NULL == (chunk_disp_array = (MPI_Aint *)H5MM_malloc(num_chunk * sizeof(MPI_Aint))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk file displacement buffer")
if(NULL == (chunk_mem_disp_array = (MPI_Aint *)H5MM_calloc(num_chunk * sizeof(MPI_Aint))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk memory displacement buffer")
if(NULL == (chunk_mpi_mem_counts = (int *)H5MM_calloc(num_chunk * sizeof(int))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk memory counts buffer")
if(NULL == (chunk_mpi_file_counts = (int *)H5MM_calloc(num_chunk * sizeof(int))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk file counts buffer")
if(NULL == (chunk_mbt_is_derived_array = (hbool_t *)H5MM_calloc(num_chunk * sizeof(hbool_t))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk memory is derived datatype flags buffer")
if(NULL == (chunk_mft_is_derived_array = (hbool_t *)H5MM_calloc(num_chunk * sizeof(hbool_t))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk file is derived datatype flags buffer")
#ifdef H5D_DEBUG
if(H5DEBUG(D))
HDfprintf(H5DEBUG(D),"before sorting the chunk address \n");
#endif
if(H5D__sort_chunk(io_info, fm, chunk_addr_info_array, sum_chunk) < 0)
HGOTO_ERROR(H5E_DATASPACE, H5E_CANTSWAP, FAIL, "unable to sort chunk address")
ctg_store.contig.dset_addr = chunk_addr_info_array[0].chunk_addr;
#ifdef H5D_DEBUG
if(H5DEBUG(D))
HDfprintf(H5DEBUG(D),"after sorting the chunk address \n");
#endif
for(u = 0; u < num_chunk; u++) {
hsize_t *permute_map = NULL;
hbool_t is_permuted = FALSE;
if(H5S_mpio_space_type(chunk_addr_info_array[u].chunk_info.fspace,
type_info->src_type_size,
&chunk_ftype[u],
&chunk_mpi_file_counts[u],
&(chunk_mft_is_derived_array[u]),
TRUE,
&permute_map,
&is_permuted ) < 0)
HGOTO_ERROR(H5E_DATASPACE, H5E_BADTYPE, FAIL, "couldn't create MPI file type")
if(is_permuted)
HDassert(permute_map);
if(H5S_mpio_space_type(chunk_addr_info_array[u].chunk_info.mspace,
type_info->dst_type_size, &chunk_mtype[u],
&chunk_mpi_mem_counts[u],
&(chunk_mbt_is_derived_array[u]),
FALSE,
&permute_map,
&is_permuted ) < 0)
HGOTO_ERROR(H5E_DATASPACE, H5E_BADTYPE, FAIL, "couldn't create MPI buf type")
if(is_permuted)
HDassert(!permute_map);
chunk_addr_info_array[u].chunk_addr -= ctg_store.contig.dset_addr;
chunk_disp_array[u] = (MPI_Aint)chunk_addr_info_array[u].chunk_addr;
}
if(MPI_SUCCESS != (mpi_code = MPI_Type_create_struct((int)num_chunk, chunk_mpi_file_counts, chunk_disp_array, chunk_ftype, &chunk_final_ftype)))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_struct failed", mpi_code)
if(MPI_SUCCESS != (mpi_code = MPI_Type_commit(&chunk_final_ftype)))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
chunk_final_ftype_is_derived = TRUE;
if(MPI_SUCCESS != (mpi_code = MPI_Type_create_struct((int)num_chunk, chunk_mpi_mem_counts, chunk_mem_disp_array, chunk_mtype, &chunk_final_mtype)))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_struct failed", mpi_code)
if(MPI_SUCCESS != (mpi_code = MPI_Type_commit(&chunk_final_mtype)))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
chunk_final_mtype_is_derived = TRUE;
for(u = 0; u < num_chunk; u++) {
if(chunk_mbt_is_derived_array[u])
if(MPI_SUCCESS != (mpi_code = MPI_Type_free(chunk_mtype + u)))
HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
if(chunk_mft_is_derived_array[u])
if(MPI_SUCCESS != (mpi_code = MPI_Type_free(chunk_ftype + u)))
HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
}
mpi_buf_count = (hsize_t)1;
}
else {
ctg_store.contig.dset_addr = 0;
chunk_final_ftype = MPI_BYTE;
chunk_final_mtype = MPI_BYTE;
mpi_buf_count = (hsize_t)0;
}
#ifdef H5D_DEBUG
if(H5DEBUG(D))
HDfprintf(H5DEBUG(D),"before coming to final collective IO\n");
#endif
io_info->store = &ctg_store;
if(H5D__final_collective_io(io_info, type_info, mpi_buf_count, chunk_final_ftype, chunk_final_mtype) < 0)
HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish MPI-IO")
}
done:
#ifdef H5D_DEBUG
if(H5DEBUG(D))
HDfprintf(H5DEBUG(D),"before freeing memory inside H5D_link_collective_io ret_value = %d\n", ret_value);
#endif
if(chunk_addr_info_array)
H5MM_xfree(chunk_addr_info_array);
if(chunk_mtype)
H5MM_xfree(chunk_mtype);
if(chunk_ftype)
H5MM_xfree(chunk_ftype);
if(chunk_disp_array)
H5MM_xfree(chunk_disp_array);
if(chunk_mem_disp_array)
H5MM_xfree(chunk_mem_disp_array);
if(chunk_mpi_mem_counts)
H5MM_xfree(chunk_mpi_mem_counts);
if(chunk_mpi_file_counts)
H5MM_xfree(chunk_mpi_file_counts);
if(chunk_mbt_is_derived_array)
H5MM_xfree(chunk_mbt_is_derived_array);
if(chunk_mft_is_derived_array)
H5MM_xfree(chunk_mft_is_derived_array);
if(chunk_final_mtype_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&chunk_final_mtype)))
HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
if(chunk_final_ftype_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&chunk_final_ftype)))
HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
FUNC_LEAVE_NOAPI(ret_value)
}
static herr_t
H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info,
H5D_chunk_map_t *fm)
{
H5D_filtered_collective_io_info_t *chunk_list = NULL;
H5D_filtered_collective_io_info_t *collective_chunk_list = NULL;
H5D_storage_t ctg_store;
MPI_Datatype mem_type = MPI_BYTE;
MPI_Datatype file_type = MPI_BYTE;
hbool_t mem_type_is_derived = FALSE;
hbool_t file_type_is_derived = FALSE;
size_t chunk_list_num_entries;
size_t collective_chunk_list_num_entries;
size_t *num_chunks_selected_array = NULL;
size_t i;
int mpi_rank, mpi_size, mpi_code;
herr_t ret_value = SUCCEED;
FUNC_ENTER_STATIC
HDassert(io_info);
HDassert(type_info);
HDassert(fm);
if ((mpi_rank = H5F_mpi_get_rank(io_info->dset->oloc.file)) < 0)
HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi rank")
if ((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0)
HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size")
H5CX_set_mpio_actual_chunk_opt(H5D_MPIO_LINK_CHUNK);
H5CX_set_mpio_actual_io_mode(H5D_MPIO_CHUNK_COLLECTIVE);
if (H5D__construct_filtered_io_info_list(io_info, type_info, fm, &chunk_list, &chunk_list_num_entries) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "couldn't construct filtered I/O info list")
if (io_info->op_type == H5D_IO_OP_WRITE) {
H5D_chk_idx_info_t index_info;
H5D_chunk_ud_t udata;
hsize_t mpi_buf_count;
index_info.f = io_info->dset->oloc.file;
index_info.pline = &(io_info->dset->shared->dcpl_cache.pline);
index_info.layout = &(io_info->dset->shared->layout.u.chunk);
index_info.storage = &(io_info->dset->shared->layout.storage.u.chunk);
udata.common.layout = index_info.layout;
udata.common.storage = index_info.storage;
udata.filter_mask = 0;
for (i = 0; i < chunk_list_num_entries; i++)
if (mpi_rank == chunk_list[i].owners.new_owner)
if (H5D__filtered_collective_chunk_entry_io(&chunk_list[i], io_info, type_info, fm) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't process chunk entry")
if (H5D__mpio_array_gatherv(chunk_list, chunk_list_num_entries, sizeof(H5D_filtered_collective_io_info_t),
(void **) &collective_chunk_list, &collective_chunk_list_num_entries, true, 0, io_info->comm, NULL) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather new chunk sizes")
for (i = 0; i < collective_chunk_list_num_entries; i++) {
hbool_t insert;
if (H5D__chunk_file_alloc(&index_info, &collective_chunk_list[i].chunk_states.chunk_current,
&collective_chunk_list[i].chunk_states.new_chunk, &insert, collective_chunk_list[i].scaled) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate chunk")
}
if (NULL == (num_chunks_selected_array = (size_t *) H5MM_malloc((size_t) mpi_size * sizeof(size_t))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate num chunks selected array")
if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&chunk_list_num_entries, 1, MPI_UNSIGNED_LONG_LONG, num_chunks_selected_array,
1, MPI_UNSIGNED_LONG_LONG, io_info->comm)))
HMPI_GOTO_ERROR(FAIL, "MPI_Allgather failed", mpi_code)
if (chunk_list_num_entries) {
size_t offset;
for (i = 0, offset = 0; i < (size_t) mpi_rank; i++)
offset += num_chunks_selected_array[i];
H5MM_memcpy(chunk_list, &collective_chunk_list[offset], num_chunks_selected_array[mpi_rank] * sizeof(H5D_filtered_collective_io_info_t));
if (H5D__mpio_filtered_collective_write_type(chunk_list, chunk_list_num_entries,
&mem_type, &mem_type_is_derived, &file_type, &file_type_is_derived) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_BADTYPE, FAIL, "couldn't create MPI link chunk I/O type")
io_info->u.wbuf = chunk_list[0].buf;
}
mpi_buf_count = (mem_type_is_derived && file_type_is_derived) ? (hsize_t) 1 : (hsize_t) 0;
ctg_store.contig.dset_addr = 0;
io_info->store = &ctg_store;
if (H5D__final_collective_io(io_info, type_info, mpi_buf_count, file_type, mem_type) < 0)
HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish MPI-IO")
for (i = 0; i < collective_chunk_list_num_entries; i++) {
udata.chunk_block = collective_chunk_list[i].chunk_states.new_chunk;
udata.common.scaled = collective_chunk_list[i].scaled;
udata.chunk_idx = collective_chunk_list[i].index;
if ((index_info.storage->ops->insert)(&index_info, &udata, io_info->dset) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINSERT, FAIL, "unable to insert chunk address into index")
}
}
done:
if (chunk_list) {
for (i = 0; i < chunk_list_num_entries; i++)
if (chunk_list[i].buf)
H5MM_free(chunk_list[i].buf);
H5MM_free(chunk_list);
}
if (num_chunks_selected_array)
H5MM_free(num_chunks_selected_array);
if (collective_chunk_list)
H5MM_free(collective_chunk_list);
if (mem_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type)))
HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
if (file_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type)))
HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
FUNC_LEAVE_NOAPI(ret_value)
}
static herr_t
H5D__multi_chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info,
H5D_chunk_map_t *fm)
{
H5D_io_info_t ctg_io_info;
H5D_storage_t ctg_store;
H5D_io_info_t cpt_io_info;
H5D_storage_t cpt_store;
hbool_t cpt_dirty;
uint8_t *chunk_io_option = NULL;
haddr_t *chunk_addr = NULL;
H5D_storage_t store;
H5FD_mpio_collective_opt_t last_coll_opt_mode = H5FD_MPIO_COLLECTIVE_IO;
size_t total_chunk;
#ifdef H5Dmpio_DEBUG
int mpi_rank;
#endif
size_t u;
H5D_mpio_actual_io_mode_t actual_io_mode = H5D_MPIO_NO_COLLECTIVE;
herr_t ret_value = SUCCEED;
FUNC_ENTER_STATIC
H5CX_set_mpio_actual_chunk_opt(H5D_MPIO_MULTI_CHUNK);
#ifdef H5Dmpio_DEBUG
mpi_rank = H5F_mpi_get_rank(io_info->dset->oloc.file);
#endif
H5_CHECKED_ASSIGN(total_chunk, size_t, fm->layout->u.chunk.nchunks, hsize_t);
HDassert(total_chunk != 0);
chunk_io_option = (uint8_t *)H5MM_calloc(total_chunk);
chunk_addr = (haddr_t *)H5MM_calloc(total_chunk * sizeof(haddr_t));
#ifdef H5D_DEBUG
if(H5DEBUG(D))
HDfprintf(H5DEBUG(D), "total_chunk %Zu\n", total_chunk);
#endif
if(H5D__obtain_mpio_mode(io_info, fm, chunk_io_option, chunk_addr) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "unable to obtain MPIO mode")
H5MM_memcpy(&ctg_io_info, io_info, sizeof(ctg_io_info));
ctg_io_info.store = &ctg_store;
ctg_io_info.layout_ops = *H5D_LOPS_CONTIG;
ctg_store.contig.dset_size = (hsize_t)io_info->dset->shared->layout.u.chunk.size;
H5MM_memcpy(&cpt_io_info, io_info, sizeof(cpt_io_info));
cpt_io_info.store = &cpt_store;
cpt_io_info.layout_ops = *H5D_LOPS_COMPACT;
cpt_store.compact.dirty = &cpt_dirty;
io_info->store = &store;
for(u = 0; u < total_chunk; u++) {
H5D_chunk_info_t *chunk_info;
H5S_t *fspace;
H5S_t *mspace;
#ifdef H5D_DEBUG
if(H5DEBUG(D))
HDfprintf(H5DEBUG(D),"mpi_rank = %d, chunk index = %Zu\n", mpi_rank, u);
#endif
chunk_info = fm->select_chunk[u];
if(chunk_info) {
HDassert(chunk_info->index == u);
store.chunk.scaled = chunk_info->scaled;
}
if(chunk_io_option[u] == H5D_CHUNK_IO_MODE_COL) {
#ifdef H5D_DEBUG
if(H5DEBUG(D))
HDfprintf(H5DEBUG(D),"inside collective chunk IO mpi_rank = %d, chunk index = %Zu\n", mpi_rank, u);
#endif
if(chunk_info) {
fspace = chunk_info->fspace;
mspace = chunk_info->mspace;
actual_io_mode = (H5D_mpio_actual_io_mode_t) (actual_io_mode | H5D_MPIO_CHUNK_COLLECTIVE);
}
else {
fspace = mspace = NULL;
}
if(last_coll_opt_mode != H5FD_MPIO_COLLECTIVE_IO) {
if(H5CX_set_mpio_coll_opt(H5FD_MPIO_COLLECTIVE_IO) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "can't switch to collective I/O")
last_coll_opt_mode = H5FD_MPIO_COLLECTIVE_IO;
}
ctg_store.contig.dset_addr = chunk_addr[u];
if(H5D__inter_collective_io(&ctg_io_info, type_info, fspace, mspace) < 0)
HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish shared collective MPI-IO")
}
else {
#ifdef H5D_DEBUG
if(H5DEBUG(D))
HDfprintf(H5DEBUG(D),"inside independent IO mpi_rank = %d, chunk index = %Zu\n", mpi_rank, u);
#endif
HDassert(chunk_io_option[u] == 0);
if(chunk_info) {
fspace = chunk_info->fspace;
mspace = chunk_info->mspace;
actual_io_mode = (H5D_mpio_actual_io_mode_t) (actual_io_mode | H5D_MPIO_CHUNK_INDEPENDENT);
}
else {
fspace = mspace = NULL;
}
if(last_coll_opt_mode != H5FD_MPIO_INDIVIDUAL_IO) {
if(H5CX_set_mpio_coll_opt(H5FD_MPIO_INDIVIDUAL_IO) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "can't switch to individual I/O")
last_coll_opt_mode = H5FD_MPIO_INDIVIDUAL_IO;
}
ctg_store.contig.dset_addr = chunk_addr[u];
if(H5D__inter_collective_io(&ctg_io_info, type_info, fspace, mspace) < 0)
HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish shared collective MPI-IO")
#ifdef H5D_DEBUG
if(H5DEBUG(D))
HDfprintf(H5DEBUG(D),"after inter collective IO\n");
#endif
}
}
H5CX_set_mpio_actual_io_mode(actual_io_mode);
done:
if(chunk_io_option)
H5MM_xfree(chunk_io_option);
if(chunk_addr)
H5MM_xfree(chunk_addr);
FUNC_LEAVE_NOAPI(ret_value)
}
static herr_t
H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info,
H5D_chunk_map_t *fm)
{
H5D_filtered_collective_io_info_t *chunk_list = NULL;
H5D_filtered_collective_io_info_t *collective_chunk_list = NULL;
H5D_storage_t store;
H5D_io_info_t ctg_io_info;
H5D_storage_t ctg_store;
MPI_Datatype *file_type_array = NULL;
MPI_Datatype *mem_type_array = NULL;
hbool_t *file_type_is_derived_array = NULL;
hbool_t *mem_type_is_derived_array = NULL;
hbool_t *has_chunk_selected_array = NULL;
size_t chunk_list_num_entries;
size_t collective_chunk_list_num_entries;
size_t i, j;
int mpi_rank, mpi_size, mpi_code;
herr_t ret_value = SUCCEED;
FUNC_ENTER_STATIC
HDassert(io_info);
HDassert(type_info);
HDassert(fm);
if ((mpi_rank = H5F_mpi_get_rank(io_info->dset->oloc.file)) < 0)
HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi rank")
if ((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0)
HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size")
H5CX_set_mpio_actual_chunk_opt(H5D_MPIO_MULTI_CHUNK);
H5CX_set_mpio_actual_io_mode(H5D_MPIO_CHUNK_COLLECTIVE);
if (H5D__construct_filtered_io_info_list(io_info, type_info, fm, &chunk_list, &chunk_list_num_entries) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "couldn't construct filtered I/O info list")
H5MM_memcpy(&ctg_io_info, io_info, sizeof(ctg_io_info));
ctg_io_info.store = &ctg_store;
ctg_io_info.layout_ops = *H5D_LOPS_CONTIG;
ctg_store.contig.dset_size = (hsize_t) io_info->dset->shared->layout.u.chunk.size;
ctg_store.contig.dset_addr = 0;
io_info->store = &store;
if (io_info->op_type == H5D_IO_OP_READ) {
for (i = 0; i < chunk_list_num_entries; i++)
if (H5D__filtered_collective_chunk_entry_io(&chunk_list[i], io_info, type_info, fm) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't process chunk entry")
}
else {
H5D_chk_idx_info_t index_info;
H5D_chunk_ud_t udata;
size_t max_num_chunks;
hsize_t mpi_buf_count;
index_info.f = io_info->dset->oloc.file;
index_info.pline = &(io_info->dset->shared->dcpl_cache.pline);
index_info.layout = &(io_info->dset->shared->layout.u.chunk);
index_info.storage = &(io_info->dset->shared->layout.storage.u.chunk);
udata.common.layout = index_info.layout;
udata.common.storage = index_info.storage;
udata.filter_mask = 0;
if (MPI_SUCCESS != (mpi_code = MPI_Allreduce(&chunk_list_num_entries, &max_num_chunks,
1, MPI_UNSIGNED_LONG_LONG, MPI_MAX, io_info->comm)))
HMPI_GOTO_ERROR(FAIL, "MPI_Allreduce failed", mpi_code)
if (!(max_num_chunks > 0)) HGOTO_DONE(SUCCEED);
if (NULL == (file_type_array = (MPI_Datatype *) H5MM_malloc(max_num_chunks * sizeof(MPI_Datatype))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate file type array")
if (NULL == (file_type_is_derived_array = (hbool_t *) H5MM_calloc(max_num_chunks * sizeof(hbool_t))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate file type is derived array")
if (NULL == (mem_type_array = (MPI_Datatype *) H5MM_malloc(max_num_chunks * sizeof(MPI_Datatype))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate mem type array")
if (NULL == (mem_type_is_derived_array = (hbool_t *) H5MM_calloc(max_num_chunks * sizeof(hbool_t))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate mem type is derived array")
for (i = 0; i < max_num_chunks; i++) {
hbool_t have_chunk_to_process = (i < chunk_list_num_entries) && (mpi_rank == chunk_list[i].owners.new_owner);
if (have_chunk_to_process)
if (H5D__filtered_collective_chunk_entry_io(&chunk_list[i], io_info, type_info, fm) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't process chunk entry")
if (H5D__mpio_array_gatherv(&chunk_list[i], have_chunk_to_process ? 1 : 0, sizeof(H5D_filtered_collective_io_info_t),
(void **) &collective_chunk_list, &collective_chunk_list_num_entries, true, 0, io_info->comm, NULL) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather new chunk sizes")
for (j = 0; j < collective_chunk_list_num_entries; j++) {
hbool_t insert = FALSE;
if (H5D__chunk_file_alloc(&index_info, &collective_chunk_list[j].chunk_states.chunk_current,
&collective_chunk_list[j].chunk_states.new_chunk, &insert, chunk_list[j].scaled) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate chunk")
}
if (NULL == (has_chunk_selected_array = (hbool_t *) H5MM_malloc((size_t) mpi_size * sizeof(hbool_t))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate num chunks selected array")
if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&have_chunk_to_process, 1, MPI_C_BOOL, has_chunk_selected_array,
1, MPI_C_BOOL, io_info->comm)))
HMPI_GOTO_ERROR(FAIL, "MPI_Allgather failed", mpi_code)
if (have_chunk_to_process) {
size_t offset;
int mpi_type_count;
for (j = 0, offset = 0; j < (size_t) mpi_rank; j++)
offset += has_chunk_selected_array[j];
H5MM_memcpy(&chunk_list[i].chunk_states.new_chunk, &collective_chunk_list[offset].chunk_states.new_chunk, sizeof(chunk_list[i].chunk_states.new_chunk));
H5_CHECKED_ASSIGN(mpi_type_count, int, chunk_list[i].chunk_states.new_chunk.length, hsize_t);
if (MPI_SUCCESS != (mpi_code = MPI_Type_contiguous(mpi_type_count, MPI_BYTE, &mem_type_array[i])))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_contiguous failed", mpi_code)
if(MPI_SUCCESS != (mpi_code = MPI_Type_commit(&mem_type_array[i])))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
mem_type_is_derived_array[i] = TRUE;
if (MPI_SUCCESS != (mpi_code = MPI_Type_contiguous(mpi_type_count, MPI_BYTE, &file_type_array[i])))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_contiguous failed", mpi_code)
if(MPI_SUCCESS != (mpi_code = MPI_Type_commit(&file_type_array[i])))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
file_type_is_derived_array[i] = TRUE;
mpi_buf_count = 1;
ctg_store.contig.dset_addr = chunk_list[i].chunk_states.new_chunk.offset;
ctg_io_info.u.wbuf = chunk_list[i].buf;
}
else {
mem_type_array[i] = file_type_array[i] = MPI_BYTE;
mpi_buf_count = 0;
}
if (H5D__final_collective_io(&ctg_io_info, type_info, mpi_buf_count, file_type_array[i], mem_type_array[i]) < 0)
HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish MPI-IO")
for (j = 0; j < collective_chunk_list_num_entries; j++) {
udata.chunk_block = collective_chunk_list[j].chunk_states.new_chunk;
udata.common.scaled = collective_chunk_list[j].scaled;
udata.chunk_idx = collective_chunk_list[j].index;
if ((index_info.storage->ops->insert)(&index_info, &udata, io_info->dset) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINSERT, FAIL, "unable to insert chunk address into index")
}
if (collective_chunk_list){
H5MM_free(collective_chunk_list);
collective_chunk_list = NULL;
}
if (has_chunk_selected_array){
H5MM_free(has_chunk_selected_array);
has_chunk_selected_array = NULL;
}
}
for (i = 0; i < max_num_chunks; i++) {
if (file_type_is_derived_array[i])
if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type_array[i])))
HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
if (mem_type_is_derived_array[i])
if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type_array[i])))
HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
}
}
done:
if (chunk_list) {
for (i = 0; i < chunk_list_num_entries; i++)
if (chunk_list[i].buf)
H5MM_free(chunk_list[i].buf);
H5MM_free(chunk_list);
}
if (collective_chunk_list)
H5MM_free(collective_chunk_list);
if (file_type_array)
H5MM_free(file_type_array);
if (mem_type_array)
H5MM_free(mem_type_array);
if (file_type_is_derived_array)
H5MM_free(file_type_is_derived_array);
if (mem_type_is_derived_array)
H5MM_free(mem_type_is_derived_array);
FUNC_LEAVE_NOAPI(ret_value)
}
static herr_t
H5D__inter_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info,
const H5S_t *file_space, const H5S_t *mem_space)
{
int mpi_buf_count;
hbool_t mbt_is_derived = FALSE;
hbool_t mft_is_derived = FALSE;
MPI_Datatype mpi_file_type, mpi_buf_type;
int mpi_code;
herr_t ret_value = SUCCEED;
FUNC_ENTER_STATIC
if((file_space != NULL) && (mem_space != NULL)) {
int mpi_file_count;
hsize_t *permute_map = NULL;
hbool_t is_permuted = FALSE;
if(H5S_mpio_space_type(file_space, type_info->src_type_size,
&mpi_file_type, &mpi_file_count, &mft_is_derived,
TRUE,
&permute_map,
&is_permuted ) < 0)
HGOTO_ERROR(H5E_DATASPACE, H5E_BADTYPE, FAIL, "couldn't create MPI file type")
if(is_permuted)
HDassert(permute_map);
if(H5S_mpio_space_type(mem_space, type_info->src_type_size,
&mpi_buf_type, &mpi_buf_count, &mbt_is_derived,
FALSE,
&permute_map ,
&is_permuted ) < 0)
HGOTO_ERROR(H5E_DATASPACE, H5E_BADTYPE, FAIL, "couldn't create MPI buffer type")
if(is_permuted)
HDassert(!permute_map);
}
else {
mpi_buf_type = MPI_BYTE;
mpi_file_type = MPI_BYTE;
mpi_buf_count = 0;
mbt_is_derived = FALSE;
mft_is_derived = FALSE;
}
#ifdef H5D_DEBUG
if(H5DEBUG(D))
HDfprintf(H5DEBUG(D),"before final collective IO \n");
#endif
if(H5D__final_collective_io(io_info, type_info, (hsize_t)mpi_buf_count, mpi_file_type, mpi_buf_type) < 0)
HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish collective MPI-IO")
done:
if(mbt_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&mpi_buf_type)))
HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
if(mft_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&mpi_file_type)))
HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
#ifdef H5D_DEBUG
if(H5DEBUG(D))
HDfprintf(H5DEBUG(D),"before leaving inter_collective_io ret_value = %d\n",ret_value);
#endif
FUNC_LEAVE_NOAPI(ret_value)
}
static herr_t
H5D__final_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info,
hsize_t mpi_buf_count, MPI_Datatype mpi_file_type, MPI_Datatype mpi_buf_type)
{
herr_t ret_value = SUCCEED;
FUNC_ENTER_STATIC
if(H5CX_set_mpi_coll_datatypes(mpi_buf_type, mpi_file_type) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "can't set MPI-I/O collective I/O datatypes")
if(io_info->op_type == H5D_IO_OP_WRITE) {
if((io_info->io_ops.single_write)(io_info, type_info, mpi_buf_count, NULL, NULL) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "optimized write failed")
}
else {
if((io_info->io_ops.single_read)(io_info, type_info, mpi_buf_count, NULL, NULL) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "optimized read failed")
}
done:
#ifdef H5D_DEBUG
if(H5DEBUG(D))
HDfprintf(H5DEBUG(D),"ret_value before leaving final_collective_io=%d\n",ret_value);
#endif
FUNC_LEAVE_NOAPI(ret_value)
}
static int
H5D__cmp_chunk_addr(const void *chunk_addr_info1, const void *chunk_addr_info2)
{
haddr_t addr1 = HADDR_UNDEF, addr2 = HADDR_UNDEF;
FUNC_ENTER_STATIC_NOERR
addr1 = ((const H5D_chunk_addr_info_t *)chunk_addr_info1)->chunk_addr;
addr2 = ((const H5D_chunk_addr_info_t *)chunk_addr_info2)->chunk_addr;
FUNC_LEAVE_NOAPI(H5F_addr_cmp(addr1, addr2))
}
static int
H5D__cmp_filtered_collective_io_info_entry(const void *filtered_collective_io_info_entry1, const void *filtered_collective_io_info_entry2)
{
haddr_t addr1 = HADDR_UNDEF, addr2 = HADDR_UNDEF;
FUNC_ENTER_STATIC_NOERR
addr1 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_info_entry1)->chunk_states.new_chunk.offset;
addr2 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_info_entry2)->chunk_states.new_chunk.offset;
FUNC_LEAVE_NOAPI(H5F_addr_cmp(addr1, addr2))
}
#if MPI_VERSION >= 3
static int
H5D__cmp_filtered_collective_io_info_entry_owner(const void *filtered_collective_io_info_entry1, const void *filtered_collective_io_info_entry2)
{
int owner1 = -1, owner2 = -1;
FUNC_ENTER_STATIC_NOERR
owner1 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_info_entry1)->owners.original_owner;
owner2 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_info_entry2)->owners.original_owner;
FUNC_LEAVE_NOAPI(owner1 - owner2)
}
#endif
static herr_t
H5D__sort_chunk(H5D_io_info_t *io_info, const H5D_chunk_map_t *fm,
H5D_chunk_addr_info_t chunk_addr_info_array[], int sum_chunk)
{
H5SL_node_t *chunk_node;
H5D_chunk_info_t *chunk_info;
haddr_t chunk_addr;
haddr_t *total_chunk_addr_array = NULL;
hbool_t do_sort = FALSE;
int bsearch_coll_chunk_threshold;
int many_chunk_opt = H5D_OBTAIN_ONE_CHUNK_ADDR_IND;
int mpi_size;
int mpi_code;
int i;
herr_t ret_value = SUCCEED;
FUNC_ENTER_STATIC
if((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0)
HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size")
bsearch_coll_chunk_threshold = (sum_chunk * 100) / ((int)fm->layout->u.chunk.nchunks * mpi_size);
if((bsearch_coll_chunk_threshold > H5D_ALL_CHUNK_ADDR_THRES_COL)
&& ((sum_chunk / mpi_size) >= H5D_ALL_CHUNK_ADDR_THRES_COL_NUM))
many_chunk_opt = H5D_OBTAIN_ALL_CHUNK_ADDR_COL;
#ifdef H5D_DEBUG
if(H5DEBUG(D))
HDfprintf(H5DEBUG(D), "many_chunk_opt= %d\n", many_chunk_opt);
#endif
if(many_chunk_opt != H5D_OBTAIN_ONE_CHUNK_ADDR_IND) {
int mpi_rank;
#ifdef H5D_DEBUG
if(H5DEBUG(D))
HDfprintf(H5DEBUG(D), "Coming inside H5D_OBTAIN_ALL_CHUNK_ADDR_COL\n");
#endif
if(NULL == (total_chunk_addr_array = (haddr_t *)H5MM_malloc(sizeof(haddr_t) * (size_t)fm->layout->u.chunk.nchunks)))
HGOTO_ERROR(H5E_RESOURCE, H5E_NOSPACE, FAIL, "unable to allocate memory chunk address array")
if((mpi_rank = H5F_mpi_get_rank(io_info->dset->oloc.file)) < 0)
HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi rank")
if(mpi_rank == 0) {
if(H5D__chunk_addrmap(io_info, total_chunk_addr_array) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get chunk address")
}
if(MPI_SUCCESS != (mpi_code = MPI_Bcast(total_chunk_addr_array, (int)(sizeof(haddr_t) * fm->layout->u.chunk.nchunks), MPI_BYTE, (int)0, io_info->comm)))
HMPI_GOTO_ERROR(FAIL, "MPI_BCast failed", mpi_code)
}
i = 0;
if(NULL == (chunk_node = H5SL_first(fm->sel_chunks)))
HGOTO_ERROR(H5E_STORAGE, H5E_CANTGET, FAIL,"couldn't get chunk node from skipped list")
while(chunk_node) {
if(NULL == (chunk_info = (H5D_chunk_info_t *)H5SL_item(chunk_node)))
HGOTO_ERROR(H5E_STORAGE, H5E_CANTGET, FAIL,"couldn't get chunk info from skipped list")
if(many_chunk_opt == H5D_OBTAIN_ONE_CHUNK_ADDR_IND) {
H5D_chunk_ud_t udata;
if(H5D__chunk_lookup(io_info->dset, chunk_info->scaled, &udata) < 0)
HGOTO_ERROR(H5E_STORAGE, H5E_CANTGET, FAIL, "couldn't get chunk info from skipped list")
chunk_addr = udata.chunk_block.offset;
}
else
chunk_addr = total_chunk_addr_array[chunk_info->index];
if(i > 0 && chunk_addr < chunk_addr_info_array[i - 1].chunk_addr)
do_sort = TRUE;
chunk_addr_info_array[i].chunk_addr = chunk_addr;
chunk_addr_info_array[i].chunk_info = *chunk_info;
i++;
chunk_node = H5SL_next(chunk_node);
}
#ifdef H5D_DEBUG
if(H5DEBUG(D))
HDfprintf(H5DEBUG(D), "before Qsort\n");
#endif
if(do_sort) {
size_t num_chunks = H5SL_count(fm->sel_chunks);
HDqsort(chunk_addr_info_array, num_chunks, sizeof(chunk_addr_info_array[0]), H5D__cmp_chunk_addr);
}
done:
if(total_chunk_addr_array)
H5MM_xfree(total_chunk_addr_array);
FUNC_LEAVE_NOAPI(ret_value)
}
static herr_t
H5D__obtain_mpio_mode(H5D_io_info_t* io_info, H5D_chunk_map_t *fm,
uint8_t assign_io_mode[], haddr_t chunk_addr[])
{
size_t total_chunks;
unsigned percent_nproc_per_chunk, threshold_nproc_per_chunk;
uint8_t* io_mode_info = NULL;
uint8_t* recv_io_mode_info = NULL;
uint8_t* mergebuf = NULL;
uint8_t* tempbuf;
H5SL_node_t* chunk_node;
H5D_chunk_info_t* chunk_info;
int mpi_size, mpi_rank;
MPI_Comm comm;
int root;
size_t ic;
int mpi_code;
herr_t ret_value = SUCCEED;
FUNC_ENTER_STATIC
root = 0;
comm = io_info->comm;
if((mpi_rank = H5F_mpi_get_rank(io_info->dset->oloc.file)) < 0)
HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi rank")
if((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0)
HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size")
H5_CHECKED_ASSIGN(total_chunks, size_t, fm->layout->u.chunk.nchunks, hsize_t);
if(H5CX_get_mpio_chunk_opt_ratio(&percent_nproc_per_chunk) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "couldn't get percent nproc per chunk")
if(0 == percent_nproc_per_chunk) {
if(H5D__chunk_addrmap(io_info, chunk_addr) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get chunk address");
for(ic = 0; ic < total_chunks; ic++)
assign_io_mode[ic] = H5D_CHUNK_IO_MODE_COL;
HGOTO_DONE(SUCCEED)
}
threshold_nproc_per_chunk = (unsigned)mpi_size * percent_nproc_per_chunk/100;
if(NULL == (io_mode_info = (uint8_t *)H5MM_calloc(total_chunks)))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate I/O mode info buffer")
if(NULL == (mergebuf = (uint8_t *)H5MM_malloc((sizeof(haddr_t) + 1) * total_chunks)))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate mergebuf buffer")
tempbuf = mergebuf + total_chunks;
if(mpi_rank == root)
if(NULL == (recv_io_mode_info = (uint8_t *)H5MM_malloc(total_chunks * (size_t)mpi_size)))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate recv I/O mode info buffer")
chunk_node = H5SL_first(fm->sel_chunks);
while(chunk_node) {
chunk_info = (H5D_chunk_info_t *)H5SL_item(chunk_node);
io_mode_info[chunk_info->index] = H5D_CHUNK_SELECT_REG;
chunk_node = H5SL_next(chunk_node);
}
H5_CHECK_OVERFLOW(total_chunks, size_t, int)
if(MPI_SUCCESS != (mpi_code = MPI_Gather(io_mode_info, (int)total_chunks, MPI_BYTE,
recv_io_mode_info, (int)total_chunks, MPI_BYTE, root, comm)))
HMPI_GOTO_ERROR(FAIL, "MPI_Gather failed", mpi_code)
if(mpi_rank == root) {
size_t nproc;
unsigned* nproc_per_chunk;
if(NULL == (nproc_per_chunk = (unsigned*)H5MM_calloc(total_chunks * sizeof(unsigned))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate nproc_per_chunk buffer")
if(H5D__chunk_addrmap(io_info, chunk_addr) < 0) {
H5MM_free(nproc_per_chunk);
HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get chunk address")
}
for(nproc = 0; nproc < (size_t)mpi_size; nproc++) {
uint8_t *tmp_recv_io_mode_info = recv_io_mode_info + (nproc * total_chunks);
for(ic = 0; ic < total_chunks; ic++, tmp_recv_io_mode_info++) {
if(*tmp_recv_io_mode_info != 0) {
nproc_per_chunk[ic]++;
}
}
}
for(ic = 0; ic < total_chunks; ic++) {
if(nproc_per_chunk[ic] > MAX(1, threshold_nproc_per_chunk)) {
assign_io_mode[ic] = H5D_CHUNK_IO_MODE_COL;
}
}
H5MM_memcpy(mergebuf, assign_io_mode, total_chunks);
H5MM_memcpy(tempbuf, chunk_addr, sizeof(haddr_t) * total_chunks);
H5MM_free(nproc_per_chunk);
}
if((sizeof(haddr_t) + 1) * total_chunks > INT_MAX)
HGOTO_ERROR(H5E_DATASET, H5E_BADVALUE, FAIL, "result overflow")
if(MPI_SUCCESS != (mpi_code = MPI_Bcast(mergebuf, (int)((sizeof(haddr_t) + 1) * total_chunks), MPI_BYTE, root, comm)))
HMPI_GOTO_ERROR(FAIL, "MPI_BCast failed", mpi_code)
H5MM_memcpy(assign_io_mode, mergebuf, total_chunks);
H5MM_memcpy(chunk_addr, tempbuf, sizeof(haddr_t) * total_chunks);
#ifdef H5_HAVE_INSTRUMENTED_LIBRARY
{
hbool_t coll_op = FALSE;
for(ic = 0; ic < total_chunks; ic++)
if(assign_io_mode[ic] == H5D_CHUNK_IO_MODE_COL) {
if(H5CX_test_set_mpio_coll_chunk_multi_ratio_coll(0) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "unable to set property value")
coll_op = TRUE;
break;
}
if(!coll_op)
if(H5CX_test_set_mpio_coll_chunk_multi_ratio_ind(0) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "unable to set property value")
}
#endif
done:
if(io_mode_info)
H5MM_free(io_mode_info);
if(mergebuf)
H5MM_free(mergebuf);
if(recv_io_mode_info) {
HDassert(mpi_rank == root);
H5MM_free(recv_io_mode_info);
}
FUNC_LEAVE_NOAPI(ret_value)
}
static herr_t
H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_type_info_t *type_info,
const H5D_chunk_map_t *fm, H5D_filtered_collective_io_info_t **chunk_list, size_t *num_entries)
{
H5D_filtered_collective_io_info_t *local_info_array = NULL;
size_t num_chunks_selected;
size_t i;
int mpi_rank;
herr_t ret_value = SUCCEED;
FUNC_ENTER_STATIC
HDassert(io_info);
HDassert(type_info);
HDassert(fm);
HDassert(chunk_list);
HDassert(num_entries);
if ((mpi_rank = H5F_mpi_get_rank(io_info->dset->oloc.file)) < 0)
HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi rank")
if ((num_chunks_selected = H5SL_count(fm->sel_chunks))) {
H5D_chunk_info_t *chunk_info;
H5D_chunk_ud_t udata;
H5SL_node_t *chunk_node;
hsize_t select_npoints;
hssize_t chunk_npoints;
if(NULL == (local_info_array = (H5D_filtered_collective_io_info_t *) H5MM_malloc(num_chunks_selected * sizeof(H5D_filtered_collective_io_info_t))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate local io info array buffer")
chunk_node = H5SL_first(fm->sel_chunks);
for(i = 0; chunk_node; i++) {
chunk_info = (H5D_chunk_info_t *) H5SL_item(chunk_node);
if(H5D__chunk_lookup(io_info->dset, chunk_info->scaled, &udata) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "error looking up chunk address")
local_info_array[i].index = chunk_info->index;
local_info_array[i].chunk_states.chunk_current = local_info_array[i].chunk_states.new_chunk = udata.chunk_block;
local_info_array[i].num_writers = 0;
local_info_array[i].owners.original_owner = local_info_array[i].owners.new_owner = mpi_rank;
local_info_array[i].buf = NULL;
local_info_array[i].async_info.num_receive_requests = 0;
local_info_array[i].async_info.receive_buffer_array = NULL;
local_info_array[i].async_info.receive_requests_array = NULL;
H5MM_memcpy(local_info_array[i].scaled, chunk_info->scaled, sizeof(chunk_info->scaled));
select_npoints = H5S_GET_SELECT_NPOINTS(chunk_info->mspace);
local_info_array[i].io_size = (size_t) select_npoints * type_info->src_type_size;
if ((chunk_npoints = H5S_GET_EXTENT_NPOINTS(chunk_info->fspace)) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
local_info_array[i].full_overwrite =
(local_info_array[i].io_size >= (hsize_t) chunk_npoints * type_info->dst_type_size) ? TRUE : FALSE;
chunk_node = H5SL_next(chunk_node);
}
}
if (io_info->op_type == H5D_IO_OP_WRITE)
#if MPI_VERSION >= 3
if (H5D__chunk_redistribute_shared_chunks(io_info, type_info, fm, local_info_array, &num_chunks_selected) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "unable to redistribute shared chunks")
#else
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "unable to redistribute shared chunks - MPI version < 3 (MPI_Mprobe and MPI_Imrecv missing)")
#endif
*chunk_list = local_info_array;
*num_entries = num_chunks_selected;
done:
FUNC_LEAVE_NOAPI(ret_value)
}
#if MPI_VERSION >= 3
static herr_t
H5D__chunk_redistribute_shared_chunks(const H5D_io_info_t *io_info, const H5D_type_info_t *type_info,
const H5D_chunk_map_t *fm, H5D_filtered_collective_io_info_t *local_chunk_array, size_t *local_chunk_array_num_entries)
{
H5D_filtered_collective_io_info_t *shared_chunks_info_array = NULL;
H5S_sel_iter_t *mem_iter = NULL;
unsigned char **mod_data = NULL;
MPI_Request *send_requests = NULL;
MPI_Status *send_statuses = NULL;
hbool_t mem_iter_init = FALSE;
size_t shared_chunks_info_array_num_entries = 0;
size_t num_send_requests = 0;
size_t *num_assigned_chunks_array = NULL;
size_t i, last_assigned_idx;
int *send_counts = NULL;
int *send_displacements = NULL;
int scatter_recvcount_int;
int mpi_rank, mpi_size, mpi_code;
herr_t ret_value = SUCCEED;
FUNC_ENTER_STATIC
HDassert(io_info);
HDassert(type_info);
HDassert(fm);
HDassert(local_chunk_array_num_entries);
if ((mpi_rank = H5F_mpi_get_rank(io_info->dset->oloc.file)) < 0)
HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi rank")
if ((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0)
HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size")
H5CX_set_libver_bounds(NULL);
if (*local_chunk_array_num_entries)
if (NULL == (send_requests = (MPI_Request *) H5MM_malloc(*local_chunk_array_num_entries * sizeof(MPI_Request))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate send requests buffer")
if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(H5S_sel_iter_t))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator")
if (H5D__mpio_array_gatherv(local_chunk_array, *local_chunk_array_num_entries, sizeof(H5D_filtered_collective_io_info_t),
(void **) &shared_chunks_info_array, &shared_chunks_info_array_num_entries, false, 0,
io_info->comm, H5D__cmp_filtered_collective_io_info_entry) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather array")
if (mpi_rank == 0) {
if (NULL == (send_counts = (int *) H5MM_calloc((size_t) mpi_size * sizeof(int))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate send counts buffer")
if (NULL == (send_displacements = (int *) H5MM_malloc((size_t) mpi_size * sizeof(int))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate send displacements buffer")
if (NULL == (num_assigned_chunks_array = (size_t *) H5MM_calloc((size_t) mpi_size * sizeof(size_t))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate number of assigned chunks array")
for (i = 0; i < shared_chunks_info_array_num_entries;) {
H5D_filtered_collective_io_info_t chunk_entry;
haddr_t last_seen_addr = shared_chunks_info_array[i].chunk_states.chunk_current.offset;
size_t set_begin_index = i;
size_t num_writers = 0;
int new_chunk_owner = shared_chunks_info_array[i].owners.original_owner;
do {
chunk_entry = shared_chunks_info_array[i];
send_counts[chunk_entry.owners.original_owner] += (int) sizeof(chunk_entry);
if (num_assigned_chunks_array[chunk_entry.owners.original_owner] < num_assigned_chunks_array[new_chunk_owner])
new_chunk_owner = chunk_entry.owners.original_owner;
num_writers++;
} while (++i < shared_chunks_info_array_num_entries && shared_chunks_info_array[i].chunk_states.chunk_current.offset == last_seen_addr);
for (; set_begin_index < i; set_begin_index++) {
shared_chunks_info_array[set_begin_index].owners.new_owner = new_chunk_owner;
shared_chunks_info_array[set_begin_index].num_writers = num_writers;
}
num_assigned_chunks_array[new_chunk_owner]++;
}
if(shared_chunks_info_array_num_entries > 1)
HDqsort(shared_chunks_info_array, shared_chunks_info_array_num_entries,
sizeof(H5D_filtered_collective_io_info_t), H5D__cmp_filtered_collective_io_info_entry_owner);
send_displacements[0] = 0;
for (i = 1; i < (size_t) mpi_size; i++)
send_displacements[i] = send_displacements[i - 1] + send_counts[i - 1];
}
H5_CHECKED_ASSIGN(scatter_recvcount_int, int, *local_chunk_array_num_entries * sizeof(H5D_filtered_collective_io_info_t), size_t);
if (MPI_SUCCESS != (mpi_code = MPI_Scatterv(shared_chunks_info_array, send_counts, send_displacements,
MPI_BYTE, local_chunk_array, scatter_recvcount_int, MPI_BYTE, 0, io_info->comm)))
HMPI_GOTO_ERROR(FAIL, "unable to scatter shared chunks info buffer", mpi_code)
if (shared_chunks_info_array) {
H5MM_free(shared_chunks_info_array);
shared_chunks_info_array = NULL;
}
if (*local_chunk_array_num_entries)
if (NULL == (mod_data = (unsigned char **) H5MM_malloc(*local_chunk_array_num_entries * sizeof(unsigned char *))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate modification data buffer array")
for (i = 0, last_assigned_idx = 0; i < *local_chunk_array_num_entries; i++) {
H5D_filtered_collective_io_info_t *chunk_entry = &local_chunk_array[i];
if (mpi_rank != chunk_entry->owners.new_owner) {
H5D_chunk_info_t *chunk_info = NULL;
unsigned char *mod_data_p = NULL;
hsize_t iter_nelmts;
size_t mod_data_size;
if(NULL == (chunk_info = (H5D_chunk_info_t *) H5SL_search(fm->sel_chunks, &chunk_entry->index)))
HGOTO_ERROR(H5E_DATASPACE, H5E_NOTFOUND, FAIL, "can't locate chunk in skip list")
if(H5S_encode(chunk_info->fspace, &mod_data_p, &mod_data_size) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to get encoded dataspace size")
iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->mspace);
H5_CHECK_OVERFLOW(iter_nelmts, hsize_t, size_t);
mod_data_size += (size_t) iter_nelmts * type_info->src_type_size;
if(NULL == (mod_data[num_send_requests] = (unsigned char *) H5MM_malloc(mod_data_size)))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk modification send buffer")
mod_data_p = mod_data[num_send_requests];
if(H5S_encode(chunk_info->fspace, &mod_data_p, &mod_data_size) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to encode dataspace")
if(H5S_select_iter_init(mem_iter, chunk_info->mspace, type_info->src_type_size, 0) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information")
mem_iter_init = TRUE;
if(0 == H5D__gather_mem(io_info->u.wbuf, mem_iter, (size_t)iter_nelmts, mod_data_p))
HGOTO_ERROR(H5E_IO, H5E_CANTGATHER, FAIL, "couldn't gather from write buffer")
H5_CHECK_OVERFLOW(mod_data_size, size_t, int)
H5_CHECK_OVERFLOW(chunk_entry->index, hsize_t, int)
if(MPI_SUCCESS != (mpi_code = MPI_Isend(mod_data[num_send_requests], (int) mod_data_size, MPI_BYTE,
chunk_entry->owners.new_owner, (int) chunk_entry->index, io_info->comm, &send_requests[num_send_requests])))
HMPI_GOTO_ERROR(FAIL, "MPI_Isend failed", mpi_code)
if(mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release memory selection iterator")
mem_iter_init = FALSE;
num_send_requests++;
}
else {
if (chunk_entry->num_writers > 1) {
MPI_Message message;
MPI_Status status;
size_t j;
chunk_entry->async_info.num_receive_requests = (int) chunk_entry->num_writers - 1;
if (NULL == (chunk_entry->async_info.receive_requests_array = (MPI_Request *) H5MM_malloc((size_t) chunk_entry->async_info.num_receive_requests * sizeof(MPI_Request))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate async requests array")
if (NULL == (chunk_entry->async_info.receive_buffer_array = (unsigned char **) H5MM_malloc((size_t) chunk_entry->async_info.num_receive_requests * sizeof(unsigned char *))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate async receive buffers")
for (j = 0; j < chunk_entry->num_writers - 1; j++) {
int count = 0;
if (MPI_SUCCESS != (mpi_code = MPI_Mprobe(MPI_ANY_SOURCE, (int) chunk_entry->index, io_info->comm, &message, &status)))
HMPI_GOTO_ERROR(FAIL, "MPI_Mprobe failed", mpi_code)
if (MPI_SUCCESS != (mpi_code = MPI_Get_count(&status, MPI_BYTE, &count)))
HMPI_GOTO_ERROR(FAIL, "MPI_Get_count failed", mpi_code)
HDassert(count >= 0);
if (NULL == (chunk_entry->async_info.receive_buffer_array[j] = (unsigned char *) H5MM_malloc((size_t) count * sizeof(char *))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate modification data receive buffer")
if (MPI_SUCCESS != (mpi_code = MPI_Imrecv(chunk_entry->async_info.receive_buffer_array[j], count, MPI_BYTE,
&message, &chunk_entry->async_info.receive_requests_array[j])))
HMPI_GOTO_ERROR(FAIL, "MPI_Imrecv failed", mpi_code)
}
}
local_chunk_array[last_assigned_idx++] = local_chunk_array[i];
}
}
*local_chunk_array_num_entries = last_assigned_idx;
if (num_send_requests) {
if (NULL == (send_statuses = (MPI_Status *) H5MM_malloc(num_send_requests * sizeof(MPI_Status))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate send statuses buffer")
H5_CHECK_OVERFLOW(num_send_requests, size_t, int);
if (MPI_SUCCESS != (mpi_code = MPI_Waitall((int) num_send_requests, send_requests, send_statuses)))
HMPI_GOTO_ERROR(FAIL, "MPI_Waitall failed", mpi_code)
}
done:
for (i = 0; i < num_send_requests; i++) {
if (mod_data[i])
H5MM_free(mod_data[i]);
}
if (send_requests)
H5MM_free(send_requests);
if (send_statuses)
H5MM_free(send_statuses);
if (send_counts)
H5MM_free(send_counts);
if (send_displacements)
H5MM_free(send_displacements);
if (mod_data)
H5MM_free(mod_data);
if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0)
HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator")
if (mem_iter)
H5MM_free(mem_iter);
if (num_assigned_chunks_array)
H5MM_free(num_assigned_chunks_array);
if (shared_chunks_info_array)
H5MM_free(shared_chunks_info_array);
FUNC_LEAVE_NOAPI(ret_value)
}
#endif
static herr_t
H5D__mpio_filtered_collective_write_type(H5D_filtered_collective_io_info_t *chunk_list,
size_t num_entries, MPI_Datatype *new_mem_type, hbool_t *mem_type_derived,
MPI_Datatype *new_file_type, hbool_t *file_type_derived)
{
MPI_Aint *write_buf_array = NULL;
MPI_Aint *file_offset_array = NULL;
int *length_array = NULL;
herr_t ret_value = SUCCEED;
FUNC_ENTER_STATIC
HDassert(chunk_list);
HDassert(new_mem_type);
HDassert(mem_type_derived);
HDassert(new_file_type);
HDassert(file_type_derived);
if (num_entries > 0) {
size_t i;
int mpi_code;
void *base_buf;
H5_CHECK_OVERFLOW(num_entries, size_t, int);
if (NULL == (length_array = (int *) H5MM_malloc((size_t) num_entries * sizeof(int))))
HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "memory allocation failed for filtered collective write length array")
if (NULL == (write_buf_array = (MPI_Aint *) H5MM_malloc((size_t) num_entries * sizeof(MPI_Aint))))
HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "memory allocation failed for filtered collective write buf length array")
if (NULL == (file_offset_array = (MPI_Aint *) H5MM_malloc((size_t) num_entries * sizeof(MPI_Aint))))
HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "memory allocation failed for collective write offset array")
HDqsort(chunk_list, num_entries, sizeof(H5D_filtered_collective_io_info_t), H5D__cmp_filtered_collective_io_info_entry);
base_buf = chunk_list[0].buf;
for (i = 0; i < num_entries; i++) {
file_offset_array[i] = (MPI_Aint) chunk_list[i].chunk_states.new_chunk.offset;
length_array[i] = (int) chunk_list[i].chunk_states.new_chunk.length;
write_buf_array[i] = (MPI_Aint) chunk_list[i].buf - (MPI_Aint) base_buf;
}
if (MPI_SUCCESS != (mpi_code = MPI_Type_create_hindexed((int) num_entries, length_array, write_buf_array, MPI_BYTE, new_mem_type)))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code)
*mem_type_derived = TRUE;
if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(new_mem_type)))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
if (MPI_SUCCESS != (mpi_code = MPI_Type_create_hindexed((int) num_entries, length_array, file_offset_array, MPI_BYTE, new_file_type)))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code)
*file_type_derived = TRUE;
if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(new_file_type)))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
}
done:
if(write_buf_array)
H5MM_free(write_buf_array);
if(file_offset_array)
H5MM_free(file_offset_array);
if(length_array)
H5MM_free(length_array);
FUNC_LEAVE_NOAPI(ret_value)
}
static herr_t
H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk_entry,
const H5D_io_info_t *io_info, const H5D_type_info_t *type_info, const H5D_chunk_map_t *fm)
{
H5D_chunk_info_t *chunk_info = NULL;
H5S_sel_iter_t *mem_iter = NULL;
H5S_sel_iter_t *file_iter = NULL;
H5Z_EDC_t err_detect;
H5Z_cb_t filter_cb;
unsigned filter_mask = 0;
hsize_t iter_nelmts;
hssize_t extent_npoints;
hsize_t true_chunk_size;
hbool_t mem_iter_init = FALSE;
hbool_t file_iter_init = FALSE;
size_t buf_size;
size_t i;
H5S_t *dataspace = NULL;
void *tmp_gath_buf = NULL;
int mpi_code;
herr_t ret_value = SUCCEED;
FUNC_ENTER_STATIC
HDassert(chunk_entry);
HDassert(io_info);
HDassert(type_info);
HDassert(fm);
if(H5CX_get_err_detect(&err_detect) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get error detection info")
if(H5CX_get_filter_cb(&filter_cb) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get I/O filter callback function")
if (NULL == (chunk_info = (H5D_chunk_info_t *) H5SL_search(fm->sel_chunks, &chunk_entry->index)))
HGOTO_ERROR(H5E_DATASPACE, H5E_NOTFOUND, FAIL, "can't locate chunk in skip list")
if ((extent_npoints = H5S_GET_EXTENT_NPOINTS(chunk_info->fspace)) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
true_chunk_size = (hsize_t) extent_npoints * type_info->src_type_size;
buf_size = MAX(chunk_entry->chunk_states.chunk_current.length, true_chunk_size);
if (NULL == (chunk_entry->buf = H5MM_malloc(buf_size)))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk data buffer")
if(!chunk_entry->full_overwrite || io_info->op_type == H5D_IO_OP_READ) {
H5FD_mpio_xfer_t xfer_mode;
chunk_entry->chunk_states.new_chunk.length = chunk_entry->chunk_states.chunk_current.length;
if(H5CX_get_io_xfer_mode(&xfer_mode) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get MPI-I/O transfer mode")
if(H5CX_set_io_xfer_mode(H5FD_MPIO_INDEPENDENT) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "can't set MPI-I/O transfer mode")
if(H5F_block_read(io_info->dset->oloc.file, H5FD_MEM_DRAW, chunk_entry->chunk_states.chunk_current.offset,
chunk_entry->chunk_states.new_chunk.length, chunk_entry->buf) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "unable to read raw data chunk")
if(H5CX_set_io_xfer_mode(xfer_mode) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "can't set MPI-I/O transfer mode")
if(H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, H5Z_FLAG_REVERSE,
&filter_mask, err_detect, filter_cb, (size_t *)&chunk_entry->chunk_states.new_chunk.length,
&buf_size, &chunk_entry->buf) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTFILTER, FAIL, "couldn't unfilter chunk for modifying")
}
else {
chunk_entry->chunk_states.new_chunk.length = true_chunk_size;
}
if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(H5S_sel_iter_t))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator")
if(H5S_select_iter_init(mem_iter, chunk_info->mspace, type_info->src_type_size, 0) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information")
mem_iter_init = TRUE;
switch(io_info->op_type) {
case H5D_IO_OP_READ:
if(NULL == (file_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(H5S_sel_iter_t))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate file iterator")
if(H5S_select_iter_init(file_iter, chunk_info->fspace, type_info->src_type_size, 0) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information")
file_iter_init = TRUE;
iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->fspace);
if(NULL == (tmp_gath_buf = H5MM_malloc(iter_nelmts * type_info->src_type_size)))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate temporary gather buffer")
if(!H5D__gather_mem(chunk_entry->buf, file_iter, (size_t) iter_nelmts, tmp_gath_buf))
HGOTO_ERROR(H5E_IO, H5E_READERROR, FAIL, "couldn't gather from chunk buffer")
iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->mspace);
if(H5D__scatter_mem(tmp_gath_buf, mem_iter, (size_t) iter_nelmts, io_info->u.rbuf) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't scatter to read buffer")
break;
case H5D_IO_OP_WRITE:
iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->mspace);
if(NULL == (tmp_gath_buf = H5MM_malloc(iter_nelmts * type_info->src_type_size)))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate temporary gather buffer")
if(0 == H5D__gather_mem(io_info->u.wbuf, mem_iter, (size_t) iter_nelmts, tmp_gath_buf))
HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "couldn't gather from write buffer")
if(H5S_SELECT_ITER_RELEASE(mem_iter) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator")
mem_iter_init = FALSE;
if(H5S_select_iter_init(mem_iter, chunk_info->fspace, type_info->dst_type_size, 0) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize file selection information")
mem_iter_init = TRUE;
iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->fspace);
if(H5D__scatter_mem(tmp_gath_buf, mem_iter, (size_t) iter_nelmts, chunk_entry->buf) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't scatter to chunk data buffer")
if(H5S_SELECT_ITER_RELEASE(mem_iter) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator")
mem_iter_init = FALSE;
if(MPI_SUCCESS != (mpi_code = MPI_Waitall(chunk_entry->async_info.num_receive_requests,
chunk_entry->async_info.receive_requests_array, MPI_STATUSES_IGNORE)))
HMPI_GOTO_ERROR(FAIL, "MPI_Waitall failed", mpi_code)
for(i = 0; i < (size_t) chunk_entry->async_info.num_receive_requests; i++) {
const unsigned char *mod_data_p;
mod_data_p = chunk_entry->async_info.receive_buffer_array[i];
if(NULL == (dataspace = H5S_decode(&mod_data_p)))
HGOTO_ERROR(H5E_DATASET, H5E_CANTDECODE, FAIL, "unable to decode dataspace")
if(H5S_select_iter_init(mem_iter, dataspace, type_info->dst_type_size, 0) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information")
mem_iter_init = TRUE;
iter_nelmts = H5S_GET_SELECT_NPOINTS(dataspace);
if(H5D__scatter_mem(mod_data_p, mem_iter, (size_t) iter_nelmts, chunk_entry->buf) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't scatter to write buffer")
if(H5S_SELECT_ITER_RELEASE(mem_iter) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator")
mem_iter_init = FALSE;
if(dataspace) {
if(H5S_close(dataspace) < 0)
HGOTO_ERROR(H5E_DATASPACE, H5E_CANTFREE, FAIL, "can't close dataspace")
dataspace = NULL;
}
H5MM_free(chunk_entry->async_info.receive_buffer_array[i]);
}
if(H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, 0, &filter_mask,
err_detect, filter_cb, (size_t *)&chunk_entry->chunk_states.new_chunk.length,
&buf_size, &chunk_entry->buf) < 0)
HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, FAIL, "output pipeline failed")
#if H5_SIZEOF_SIZE_T > 4
if (chunk_entry->chunk_states.new_chunk.length > ((size_t) 0xffffffff))
HGOTO_ERROR(H5E_DATASET, H5E_BADRANGE, FAIL, "chunk too large for 32-bit length")
#endif
break;
default:
HGOTO_ERROR(H5E_DATASET, H5E_BADVALUE, FAIL, "invalid I/O operation")
}
done:
if (chunk_entry->async_info.receive_buffer_array)
H5MM_free(chunk_entry->async_info.receive_buffer_array);
if (chunk_entry->async_info.receive_requests_array)
H5MM_free(chunk_entry->async_info.receive_requests_array);
if (tmp_gath_buf)
H5MM_free(tmp_gath_buf);
if (file_iter_init && H5S_SELECT_ITER_RELEASE(file_iter) < 0)
HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator")
if (file_iter)
H5MM_free(file_iter);
if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0)
HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator")
if (mem_iter)
H5MM_free(mem_iter);
if (dataspace)
if (H5S_close(dataspace) < 0)
HDONE_ERROR(H5E_DATASPACE, H5E_CANTFREE, FAIL, "can't close dataspace")
FUNC_LEAVE_NOAPI(ret_value)
}
#endif