#ifdef H5_HAVE_LIBHDFS
#include "H5FDdrvr_module.h"
#endif
#include "H5private.h"
#include "H5Eprivate.h"
#include "H5FDprivate.h"
#include "H5FDhdfs.h"
#include "H5FLprivate.h"
#include "H5Iprivate.h"
#include "H5MMprivate.h"
#ifdef H5_HAVE_LIBHDFS
H5_GCC_DIAG_OFF(strict-prototypes)
H5_GCC_DIAG_OFF(undef)
#include <hdfs.h>
H5_GCC_DIAG_ON(strict-prototypes)
H5_GCC_DIAG_ON(undef)
#define HDFS_DEBUG 0
#define HDFS_STATS 0
static hid_t H5FD_HDFS_g = 0;
#if HDFS_STATS
#define HDFS_STATS_STARTING_MIN 0xfffffffful
#define HDFS_STATS_BASE 2
#define HDFS_STATS_INTERVAL 1
#define HDFS_STATS_START_POWER 10
#define HDFS_STATS_BIN_COUNT 16
#define HDFS_STATS_POW(bin_i, out_ptr) { \
unsigned long long donotshadowresult = 1; \
unsigned donotshadowindex = 0; \
for(donotshadowindex = 0; \
donotshadowindex < (((bin_i) * HDFS_STATS_INTERVAL) + \
HDFS_STATS_START_POWER); \
donotshadowindex++) \
{ \
donotshadowresult *= HDFS_STATS_BASE; \
} \
*(out_ptr) = donotshadowresult; \
}
static unsigned long long hdfs_stats_boundaries[HDFS_STATS_BIN_COUNT];
typedef struct {
unsigned long long count;
unsigned long long bytes;
unsigned long long min;
unsigned long long max;
} hdfs_statsbin;
#endif
#define HDFS_HDFST_MAGIC 0x1AD5DE84
typedef struct {
unsigned long magic;
hdfsFS filesystem;
hdfsFileInfo *fileinfo;
hdfsFile file;
} hdfs_t;
typedef struct H5FD_hdfs_t {
H5FD_t pub;
H5FD_hdfs_fapl_t fa;
haddr_t eoa;
hdfs_t *hdfs_handle;
#if HDFS_STATS
hdfs_statsbin meta[HDFS_STATS_BIN_COUNT + 1];
hdfs_statsbin raw[HDFS_STATS_BIN_COUNT + 1];
#endif
} H5FD_hdfs_t;
#define MAXADDR (((haddr_t)1 << (8 * sizeof(HDoff_t) - 1)) - 1)
#define ADDR_OVERFLOW(A) (HADDR_UNDEF==(A) || ((A) & ~(haddr_t)MAXADDR))
static herr_t H5FD__hdfs_term(void);
static void *H5FD__hdfs_fapl_get(H5FD_t *_file);
static void *H5FD__hdfs_fapl_copy(const void *_old_fa);
static herr_t H5FD__hdfs_fapl_free(void *_fa);
static H5FD_t *H5FD__hdfs_open(const char *name, unsigned flags, hid_t fapl_id,
haddr_t maxaddr);
static herr_t H5FD__hdfs_close(H5FD_t *_file);
static int H5FD__hdfs_cmp(const H5FD_t *_f1, const H5FD_t *_f2);
static herr_t H5FD__hdfs_query(const H5FD_t *_f1, unsigned long *flags);
static haddr_t H5FD__hdfs_get_eoa(const H5FD_t *_file, H5FD_mem_t type);
static herr_t H5FD__hdfs_set_eoa(H5FD_t *_file, H5FD_mem_t type, haddr_t addr);
static haddr_t H5FD__hdfs_get_eof(const H5FD_t *_file, H5FD_mem_t type);
static herr_t H5FD__hdfs_get_handle(H5FD_t *_file, hid_t fapl,
void** file_handle);
static herr_t H5FD__hdfs_read(H5FD_t *_file, H5FD_mem_t type, hid_t fapl_id,
haddr_t addr, size_t size, void *buf);
static herr_t H5FD__hdfs_write(H5FD_t *_file, H5FD_mem_t type, hid_t fapl_id,
haddr_t addr, size_t size, const void *buf);
static herr_t H5FD__hdfs_truncate(H5FD_t *_file, hid_t dxpl_id,
hbool_t closing);
static herr_t H5FD__hdfs_lock(H5FD_t *_file, hbool_t rw);
static herr_t H5FD__hdfs_unlock(H5FD_t *_file);
static herr_t H5FD__hdfs_validate_config(const H5FD_hdfs_fapl_t * fa);
static const H5FD_class_t H5FD_hdfs_g = {
"hdfs",
MAXADDR,
H5F_CLOSE_WEAK,
H5FD__hdfs_term,
NULL,
NULL,
NULL,
sizeof(H5FD_hdfs_fapl_t),
H5FD__hdfs_fapl_get,
H5FD__hdfs_fapl_copy,
H5FD__hdfs_fapl_free,
0,
NULL,
NULL,
H5FD__hdfs_open,
H5FD__hdfs_close,
H5FD__hdfs_cmp,
H5FD__hdfs_query,
NULL,
NULL,
NULL,
H5FD__hdfs_get_eoa,
H5FD__hdfs_set_eoa,
H5FD__hdfs_get_eof,
H5FD__hdfs_get_handle,
H5FD__hdfs_read,
H5FD__hdfs_write,
NULL,
H5FD__hdfs_truncate,
H5FD__hdfs_lock,
H5FD__hdfs_unlock,
H5FD_FLMAP_DICHOTOMY
};
H5FL_DEFINE_STATIC(H5FD_hdfs_t);
static herr_t
H5FD__init_package(void)
{
herr_t ret_value = SUCCEED;
FUNC_ENTER_STATIC
if(H5FD_hdfs_init() < 0)
HGOTO_ERROR(H5E_VFL, H5E_CANTINIT, FAIL, "unable to initialize hdfs VFD")
done:
FUNC_LEAVE_NOAPI(ret_value)
}
hid_t
H5FD_hdfs_init(void)
{
#if HDFS_STATS
unsigned int bin_i;
#endif
hid_t ret_value = H5I_INVALID_HID;
FUNC_ENTER_NOAPI(H5I_INVALID_HID)
#if HDFS_DEBUG
HDfprintf(stdout, "called %s.\n", FUNC);
#endif
if(H5I_VFL != H5I_get_type(H5FD_HDFS_g))
H5FD_HDFS_g = H5FD_register(&H5FD_hdfs_g, sizeof(H5FD_class_t), FALSE);
#if HDFS_STATS
for(bin_i = 0; bin_i < HDFS_STATS_BIN_COUNT; bin_i++) {
unsigned long long value = 0;
HDFS_STATS_POW(bin_i, &value)
hdfs_stats_boundaries[bin_i] = value;
}
#endif
ret_value = H5FD_HDFS_g;
done:
FUNC_LEAVE_NOAPI(ret_value)
}
static herr_t
H5FD__hdfs_term(void)
{
FUNC_ENTER_STATIC_NOERR
#if HDFS_DEBUG
HDfprintf(stdout, "called %s.\n", FUNC);
#endif
H5FD_HDFS_g = 0;
FUNC_LEAVE_NOAPI(SUCCEED)
}
static hdfs_t *
H5FD__hdfs_handle_open(const char *path, const char *namenode_name,
const int32_t namenode_port, const char *user_name,
const char *kerberos_ticket_cache, const int32_t stream_buffer_size)
{
struct hdfsBuilder *builder = NULL;
hdfs_t *handle = NULL;
hdfs_t *ret_value = NULL;
FUNC_ENTER_STATIC
#if HDFS_DEBUG
HDfprintf(stdout, "called %s.\n", FUNC);
#endif
if(path == NULL || path[0] == '\0')
HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, NULL, "path cannot be null")
if(namenode_name == NULL)
HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, NULL, "namenode name cannot be null")
if(namenode_port < 0 || namenode_port > 65535)
HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, NULL, "namenode port must be non-negative and <= 65535")
if(stream_buffer_size < 0)
HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, NULL, "buffer size must non-negative")
handle = (hdfs_t *)H5MM_malloc(sizeof(hdfs_t));
if(handle == NULL)
HGOTO_ERROR(H5E_ARGS, H5E_CANTALLOC, NULL, "could not malloc space for handle")
handle->magic = (unsigned long)HDFS_HDFST_MAGIC;
handle->filesystem = NULL;
handle->fileinfo = NULL;
handle->file = NULL;
builder = hdfsNewBuilder();
if(!builder)
HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, NULL, "(hdfs) failed to create builder")
hdfsBuilderSetNameNode(builder, namenode_name);
hdfsBuilderSetNameNodePort(builder, (tPort)namenode_port);
if(user_name != NULL && user_name[0] != '\0')
hdfsBuilderSetUserName(builder, user_name);
if(kerberos_ticket_cache != NULL && kerberos_ticket_cache[0] != '\0')
hdfsBuilderSetKerbTicketCachePath(builder, kerberos_ticket_cache);
handle->filesystem = hdfsBuilderConnect(builder);
if(!handle->filesystem)
HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, NULL, "(hdfs) could not connect to default namenode")
handle->fileinfo = hdfsGetPathInfo(handle->filesystem, path);
if(!handle->fileinfo)
HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, NULL, "hdfsGetPathInfo failed")
handle->file = hdfsOpenFile(handle->filesystem, path, O_RDONLY, stream_buffer_size, 0, 0);
if(!handle->file)
HGOTO_ERROR(H5E_VFL, H5E_CANTOPENFILE, NULL, "(hdfs) could not open")
ret_value = handle;
done:
if(ret_value == NULL && handle != NULL) {
HDassert(handle->magic == HDFS_HDFST_MAGIC);
handle->magic++;
if(handle->file != NULL)
if(FAIL == (hdfsCloseFile(handle->filesystem, handle->file)))
HDONE_ERROR(H5E_VFL, H5E_CANTCLOSEFILE, NULL, "unable to close hdfs file handle")
if(handle->fileinfo != NULL)
hdfsFreeFileInfo(handle->fileinfo, 1);
if(handle->filesystem != NULL)
if(FAIL == (hdfsDisconnect(handle->filesystem)))
HDONE_ERROR(H5E_VFL, H5E_CANTCLOSEFILE, NULL, "unable to disconnect from hdfs")
H5MM_xfree(handle);
}
FUNC_LEAVE_NOAPI(ret_value)
}
static herr_t
H5FD__hdfs_handle_close(hdfs_t *handle)
{
herr_t ret_value = SUCCEED;
FUNC_ENTER_STATIC
#if HDFS_DEBUG
HDfprintf(stdout, "called %s.\n", FUNC);
#endif
if(handle == NULL)
HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "handle cannot be null")
if(handle->magic != HDFS_HDFST_MAGIC)
HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "handle has invalid magic")
handle->magic++;
if(handle->file != NULL)
if(FAIL == (hdfsCloseFile(handle->filesystem, handle->file)))
HDONE_ERROR(H5E_VFL, H5E_CANTCLOSEFILE, FAIL, "unable to close hdfs file handle")
if(handle->fileinfo != NULL)
hdfsFreeFileInfo(handle->fileinfo, 1);
if(handle->filesystem != NULL)
if(FAIL == (hdfsDisconnect(handle->filesystem)))
HDONE_ERROR(H5E_VFL, H5E_CANTCLOSEFILE, FAIL, "unable to disconnect hdfs file system")
H5MM_xfree(handle);
done:
FUNC_LEAVE_NOAPI(ret_value)
}
static herr_t
H5FD__hdfs_validate_config(const H5FD_hdfs_fapl_t * fa)
{
herr_t ret_value = SUCCEED;
FUNC_ENTER_STATIC
HDassert(fa != NULL);
if(fa->version != H5FD__CURR_HDFS_FAPL_T_VERSION)
HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "Unknown H5FD_hdfs_fapl_t version");
if(fa->namenode_port > 65535)
HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "Invalid namenode port number");
if(fa->namenode_port < 0)
HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "Invalid namenode port number");
done:
FUNC_LEAVE_NOAPI(ret_value)
}
herr_t
H5Pset_fapl_hdfs(hid_t fapl_id, H5FD_hdfs_fapl_t *fa)
{
H5P_genplist_t *plist = NULL;
herr_t ret_value = FAIL;
FUNC_ENTER_API(FAIL)
H5TRACE2("e", "i*x", fapl_id, fa);
HDassert(fa != NULL);
#if HDFS_DEBUG
HDfprintf(stdout, "called %s.\n", FUNC);
#endif
plist = H5P_object_verify(fapl_id, H5P_FILE_ACCESS);
if(plist == NULL)
HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, FAIL, "not a file access property list")
if(FAIL == H5FD__hdfs_validate_config(fa))
HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "invalid hdfs config")
ret_value = H5P_set_driver(plist, H5FD_HDFS, (void *)fa);
done:
FUNC_LEAVE_API(ret_value)
}
herr_t
H5Pget_fapl_hdfs(hid_t fapl_id, H5FD_hdfs_fapl_t *fa_out)
{
const H5FD_hdfs_fapl_t *fa = NULL;
H5P_genplist_t *plist = NULL;
herr_t ret_value = SUCCEED;
FUNC_ENTER_API(FAIL)
H5TRACE2("e", "i*x", fapl_id, fa_out);
#if HDFS_DEBUG
HDfprintf(stdout, "called %s.\n", FUNC);
#endif
if(fa_out == NULL)
HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "fa_out is NULL")
plist = H5P_object_verify(fapl_id, H5P_FILE_ACCESS);
if(plist == NULL)
HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, FAIL, "not a file access list")
if(H5FD_HDFS != H5P_peek_driver(plist))
HGOTO_ERROR(H5E_PLIST, H5E_BADVALUE, FAIL, "incorrect VFL driver")
fa = (const H5FD_hdfs_fapl_t *)H5P_peek_driver_info(plist);
if(fa == NULL)
HGOTO_ERROR(H5E_PLIST, H5E_BADVALUE, FAIL, "bad VFL driver info")
HDmemcpy(fa_out, fa, sizeof(H5FD_hdfs_fapl_t));
done:
FUNC_LEAVE_API(ret_value)
}
static void *
H5FD__hdfs_fapl_get(H5FD_t *_file)
{
H5FD_hdfs_t *file = (H5FD_hdfs_t*)_file;
H5FD_hdfs_fapl_t *fa = NULL;
void *ret_value = NULL;
FUNC_ENTER_STATIC
fa = (H5FD_hdfs_fapl_t *)H5MM_calloc(sizeof(H5FD_hdfs_fapl_t));
if(fa == NULL)
HGOTO_ERROR(H5E_VFL, H5E_CANTALLOC, NULL, "memory allocation failed")
HDmemcpy(fa, &(file->fa), sizeof(H5FD_hdfs_fapl_t));
ret_value = fa;
done:
if(ret_value == NULL && fa != NULL)
H5MM_xfree(fa);
FUNC_LEAVE_NOAPI(ret_value)
}
static void *
H5FD__hdfs_fapl_copy(const void *_old_fa)
{
const H5FD_hdfs_fapl_t *old_fa = (const H5FD_hdfs_fapl_t*)_old_fa;
H5FD_hdfs_fapl_t *new_fa = NULL;
void *ret_value = NULL;
FUNC_ENTER_STATIC
new_fa = (H5FD_hdfs_fapl_t *)H5MM_malloc(sizeof(H5FD_hdfs_fapl_t));
if(new_fa == NULL)
HGOTO_ERROR(H5E_VFL, H5E_CANTALLOC, NULL, "memory allocation failed")
HDmemcpy(new_fa, old_fa, sizeof(H5FD_hdfs_fapl_t));
ret_value = new_fa;
done:
if(ret_value == NULL && new_fa != NULL)
H5MM_xfree(new_fa);
FUNC_LEAVE_NOAPI(ret_value)
}
static herr_t
H5FD__hdfs_fapl_free(void *_fa)
{
H5FD_hdfs_fapl_t *fa = (H5FD_hdfs_fapl_t*)_fa;
FUNC_ENTER_STATIC_NOERR
HDassert(fa != NULL);
H5MM_xfree(fa);
FUNC_LEAVE_NOAPI(SUCCEED)
}
#if HDFS_STATS
static herr_t
hdfs__reset_stats(H5FD_hdfs_t *file)
{
unsigned i = 0;
herr_t ret_value = SUCCEED;
FUNC_ENTER_STATIC
#if HDFS_DEBUG
HDprintf("hdfs__reset_stats() called\n");
#endif
if(file == NULL)
HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "file was null")
for(i = 0; i <= HDFS_STATS_BIN_COUNT; i++) {
file->raw[i].bytes = 0;
file->raw[i].count = 0;
file->raw[i].min = (unsigned long long)HDFS_STATS_STARTING_MIN;
file->raw[i].max = 0;
file->meta[i].bytes = 0;
file->meta[i].count = 0;
file->meta[i].min = (unsigned long long)HDFS_STATS_STARTING_MIN;
file->meta[i].max = 0;
}
done:
FUNC_LEAVE_NOAPI(ret_value);
}
#endif
static H5FD_t *
H5FD__hdfs_open(const char *path, unsigned flags, hid_t fapl_id, haddr_t maxaddr)
{
H5FD_t *ret_value = NULL;
H5FD_hdfs_t *file = NULL;
hdfs_t *handle = NULL;
H5FD_hdfs_fapl_t fa;
FUNC_ENTER_STATIC
#if HDFS_DEBUG
HDfprintf(stdout, "called %s.\n", FUNC);
#endif
HDcompile_assert(sizeof(HDoff_t) >= sizeof(size_t));
if(!path || !*path)
HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, NULL, "invalid file name")
if(0 == maxaddr || HADDR_UNDEF == maxaddr)
HGOTO_ERROR(H5E_ARGS, H5E_BADRANGE, NULL, "bogus maxaddr")
if(ADDR_OVERFLOW(maxaddr))
HGOTO_ERROR(H5E_ARGS, H5E_OVERFLOW, NULL, "bogus maxaddr")
if(flags != H5F_ACC_RDONLY)
HGOTO_ERROR(H5E_ARGS, H5E_UNSUPPORTED, NULL, "only Read-Only access allowed")
if(fapl_id == H5P_DEFAULT || fapl_id == H5P_FILE_ACCESS_DEFAULT)
HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, NULL, "fapl cannot be H5P_DEFAULT")
if(FAIL == H5Pget_fapl_hdfs(fapl_id, &fa))
HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, NULL, "can't get property list")
handle = H5FD__hdfs_handle_open(path, fa.namenode_name, fa.namenode_port,
fa.user_name, fa.kerberos_ticket_cache, fa.stream_buffer_size);
if(handle == NULL)
HGOTO_ERROR(H5E_VFL, H5E_CANTOPENFILE, NULL, "could not open")
HDassert(handle->magic == HDFS_HDFST_MAGIC);
file = H5FL_CALLOC(H5FD_hdfs_t);
if(file == NULL)
HGOTO_ERROR(H5E_VFL, H5E_CANTALLOC, NULL, "unable to allocate file struct")
file->hdfs_handle = handle;
HDmemcpy(&(file->fa), &fa, sizeof(H5FD_hdfs_fapl_t));
#if HDFS_STATS
if(FAIL == hdfs__reset_stats(file))
HGOTO_ERROR(H5E_INTERNAL, H5E_UNINITIALIZED, NULL, "unable to reset file statistics")
#endif
ret_value = (H5FD_t*)file;
done:
if(ret_value == NULL) {
if(handle != NULL)
if(FAIL == H5FD__hdfs_handle_close(handle))
HDONE_ERROR(H5E_VFL, H5E_CANTCLOSEFILE, NULL, "unable to close HDFS file handle")
if(file != NULL)
file = H5FL_FREE(H5FD_hdfs_t, file);
}
FUNC_LEAVE_NOAPI(ret_value)
}
#if HDFS_STATS
static herr_t
hdfs__fprint_stats(FILE *stream, const H5FD_hdfs_t *file)
{
herr_t ret_value = SUCCEED;
parsed_url_t *purl = NULL;
unsigned i = 0;
unsigned long count_meta = 0;
unsigned long count_raw = 0;
double average_meta = 0.0;
double average_raw = 0.0;
unsigned long long min_meta = (unsigned long long)HDFS_STATS_STARTING_MIN;
unsigned long long min_raw = (unsigned long long)HDFS_STATS_STARTING_MIN;
unsigned long long max_meta = 0;
unsigned long long max_raw = 0;
unsigned long long bytes_raw = 0;
unsigned long long bytes_meta = 0;
double re_dub = 0.0;
unsigned suffix_i = 0;
const char suffixes[] = { ' ', 'K', 'M', 'G', 'T', 'P' };
FUNC_ENTER_STATIC
if(stream == NULL)
HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "file stream cannot be null")
if(file == NULL)
HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "file cannot be null")
if(file->hdfs_handle == NULL)
HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "hdfs handle cannot be null")
if(file->hdfs_handle->magic != HDFS_HDFST_MAGIC)
HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "hdfs handle has invalid magic")
for(i = 0; i <= HDFS_STATS_BIN_COUNT; i++) {
const hdfs_statsbin *r = &file->raw[i];
const hdfs_statsbin *m = &file->meta[i];
if(m->min < min_meta)
min_meta = m->min;
if(r->min < min_raw)
min_raw = r->min;
if(m->max > max_meta)
max_meta = m->max;
if(r->max > max_raw)
max_raw = r->max;
count_raw += r->count;
count_meta += m->count;
bytes_raw += r->bytes;
bytes_meta += m->bytes;
}
if(count_raw > 0)
average_raw = (double)bytes_raw / (double)count_raw;
if(count_meta > 0)
average_meta = (double)bytes_meta / (double)count_meta;
HDfprintf(stream, "TOTAL READS: %llu (%llu meta, %llu raw)\n",
count_raw + count_meta, count_meta, count_raw);
HDfprintf(stream, "TOTAL BYTES: %llu (%llu meta, %llu raw)\n",
bytes_raw + bytes_meta, bytes_meta, bytes_raw);
if(count_raw + count_meta == 0)
goto done;
HDfprintf(stream, "SIZES meta raw\n");
HDfprintf(stream, " min ");
if(count_meta == 0)
HDfprintf(stream, " 0.000 ");
else {
re_dub = (double)min_meta;
for(suffix_i = 0; re_dub >= 1024.0; suffix_i++)
re_dub /= 1024.0;
HDassert(suffix_i < sizeof(suffixes));
HDfprintf(stream, "%8.3lf%c ", re_dub, suffixes[suffix_i]);
}
if(count_raw == 0)
HDfprintf(stream, " 0.000 \n");
else {
re_dub = (double)min_raw;
for(suffix_i = 0; re_dub >= 1024.0; suffix_i++)
re_dub /= 1024.0;
HDassert(suffix_i < sizeof(suffixes));
HDfprintf(stream, "%8.3lf%c\n", re_dub, suffixes[suffix_i]);
}
HDfprintf(stream, " avg ");
re_dub = (double)average_meta;
for(suffix_i = 0; re_dub >= 1024.0; suffix_i++)
re_dub /= 1024.0;
HDassert(suffix_i < sizeof(suffixes));
HDfprintf(stream, "%8.3lf%c ", re_dub, suffixes[suffix_i]);
re_dub = (double)average_raw;
for(suffix_i = 0; re_dub >= 1024.0; suffix_i++)
re_dub /= 1024.0;
HDassert(suffix_i < sizeof(suffixes));
HDfprintf(stream, "%8.3lf%c\n", re_dub, suffixes[suffix_i]);
HDfprintf(stream, " max ");
re_dub = (double)max_meta;
for(suffix_i = 0; re_dub >= 1024.0; suffix_i++)
re_dub /= 1024.0;
HDassert(suffix_i < sizeof(suffixes));
HDfprintf(stream, "%8.3lf%c ", re_dub, suffixes[suffix_i]);
re_dub = (double)max_raw;
for(suffix_i = 0; re_dub >= 1024.0; suffix_i++)
re_dub /= 1024.0;
HDassert(suffix_i < sizeof(suffixes));
HDfprintf(stream, "%8.3lf%c\n", re_dub, suffixes[suffix_i]);
HDfprintf(stream,
"BINS # of reads total bytes average size\n");
HDfprintf(stream,
" up-to meta raw meta raw meta raw\n");
for(i = 0; i <= HDFS_STATS_BIN_COUNT; i++) {
const hdfs_statsbin *m;
const hdfs_statsbin *r;
unsigned long long range_end = 0;
char bm_suffix = ' ';
double bm_val = 0.0;
char br_suffix = ' ';
double br_val = 0.0;
char am_suffix = ' ';
double am_val = 0.0;
char ar_suffix = ' ';
double ar_val = 0.0;
m = &file->meta[i];
r = &file->raw[i];
if(r->count == 0 && m->count == 0)
continue;
range_end = hdfs_stats_boundaries[i];
if(i == HDFS_STATS_BIN_COUNT) {
range_end = hdfs_stats_boundaries[i-1];
HDfprintf(stream, ">");
}
else
HDfprintf(stream, " ");
bm_val = (double)m->bytes;
for(suffix_i = 0; bm_val >= 1024.0; suffix_i++)
bm_val /= 1024.0;
HDassert(suffix_i < sizeof(suffixes));
bm_suffix = suffixes[suffix_i];
br_val = (double)r->bytes;
for(suffix_i = 0; br_val >= 1024.0; suffix_i++)
br_val /= 1024.0;
HDassert(suffix_i < sizeof(suffixes));
br_suffix = suffixes[suffix_i];
if(m->count > 0)
am_val = (double)(m->bytes) / (double)(m->count);
for(suffix_i = 0; am_val >= 1024.0; suffix_i++)
am_val /= 1024.0;
HDassert(suffix_i < sizeof(suffixes));
am_suffix = suffixes[suffix_i];
if(r->count > 0)
ar_val = (double)(r->bytes) / (double)(r->count);
for(suffix_i = 0; ar_val >= 1024.0; suffix_i++)
ar_val /= 1024.0;
HDassert(suffix_i < sizeof(suffixes));
ar_suffix = suffixes[suffix_i];
re_dub = (double)range_end;
for(suffix_i = 0; re_dub >= 1024.0; suffix_i++)
re_dub /= 1024.0;
HDassert(suffix_i < sizeof(suffixes));
HDfprintf(
stream,
" %8.3f%c %7d %7d %8.3f%c %8.3f%c %8.3f%c %8.3f%c\n",
re_dub, suffixes[suffix_i],
m->count,
r->count,
bm_val, bm_suffix,
br_val, br_suffix,
am_val, am_suffix,
ar_val, ar_suffix);
HDfflush(stream);
}
done:
FUNC_LEAVE_NOAPI(ret_value);
}
#endif
static herr_t
H5FD__hdfs_close(H5FD_t *_file)
{
herr_t ret_value = SUCCEED;
H5FD_hdfs_t *file = (H5FD_hdfs_t *)_file;
FUNC_ENTER_STATIC
#if HDFS_DEBUG
HDfprintf(stdout, "called %s.\n", FUNC);
#endif
HDassert(file != NULL);
HDassert(file->hdfs_handle != NULL);
HDassert(file->hdfs_handle->magic == HDFS_HDFST_MAGIC);
if(file->hdfs_handle != NULL)
if(FAIL == H5FD__hdfs_handle_close(file->hdfs_handle))
HGOTO_ERROR(H5E_VFL, H5E_CANTCLOSEFILE, FAIL, "unable to close HDFS file handle")
#if HDFS_STATS
if(FAIL == hdfs__fprint_stats(stdout, file))
HGOTO_ERROR(H5E_INTERNAL, H5E_ERROR, FAIL, "problem while writing file statistics")
#endif
file = H5FL_FREE(H5FD_hdfs_t, file);
done:
FUNC_LEAVE_NOAPI(ret_value)
}
static int
H5FD__hdfs_cmp(const H5FD_t *_f1, const H5FD_t *_f2)
{
int ret_value = 0;
const H5FD_hdfs_t *f1 = (const H5FD_hdfs_t *)_f1;
const H5FD_hdfs_t *f2 = (const H5FD_hdfs_t *)_f2;
hdfsFileInfo *finfo1 = NULL;
hdfsFileInfo *finfo2 = NULL;
FUNC_ENTER_STATIC_NOERR
#if HDFS_DEBUG
HDfprintf(stdout, "called %s.\n", FUNC);
#endif
HDassert(f1->hdfs_handle != NULL);
HDassert(f2->hdfs_handle != NULL);
HDassert(f1->hdfs_handle->magic == HDFS_HDFST_MAGIC);
HDassert(f2->hdfs_handle->magic == HDFS_HDFST_MAGIC);
finfo1 = f1->hdfs_handle->fileinfo;
finfo2 = f2->hdfs_handle->fileinfo;
HDassert(finfo1 != NULL);
HDassert(finfo2 != NULL);
if(finfo1->mKind != finfo2->mKind) { HGOTO_DONE(-1); }
if(finfo1->mName != finfo2->mName) { HGOTO_DONE(-1); }
if(finfo1->mLastMod != finfo2->mLastMod) { HGOTO_DONE(-1); }
if(finfo1->mSize != finfo2->mSize) { HGOTO_DONE(-1); }
if(finfo1->mReplication != finfo2->mReplication) { HGOTO_DONE(-1); }
if(finfo1->mBlockSize != finfo2->mBlockSize) { HGOTO_DONE(-1); }
if(HDstrcmp(finfo1->mOwner, finfo2->mOwner)) { HGOTO_DONE(-1); }
if(HDstrcmp(finfo1->mGroup, finfo2->mGroup)) { HGOTO_DONE(-1); }
if(finfo1->mPermissions != finfo2->mPermissions) { HGOTO_DONE(-1); }
if(finfo1->mLastAccess != finfo2->mLastAccess) { HGOTO_DONE(-1); }
done:
FUNC_LEAVE_NOAPI(ret_value)
}
static herr_t
H5FD__hdfs_query(const H5FD_t H5_ATTR_UNUSED *_file, unsigned long *flags)
{
FUNC_ENTER_STATIC_NOERR
#if HDFS_DEBUG
HDfprintf(stdout, "called %s.\n", FUNC);
#endif
if(flags) {
*flags = 0;
*flags |= H5FD_FEAT_DATA_SIEVE;
}
FUNC_LEAVE_NOAPI(SUCCEED)
}
static haddr_t
H5FD__hdfs_get_eoa(const H5FD_t *_file, H5FD_mem_t H5_ATTR_UNUSED type)
{
const H5FD_hdfs_t *file = (const H5FD_hdfs_t *)_file;
FUNC_ENTER_STATIC_NOERR
#if HDFS_DEBUG
HDfprintf(stdout, "called %s.\n", FUNC);
#endif
FUNC_LEAVE_NOAPI(file->eoa)
}
static herr_t
H5FD__hdfs_set_eoa(H5FD_t *_file, H5FD_mem_t H5_ATTR_UNUSED type, haddr_t addr)
{
H5FD_hdfs_t *file = (H5FD_hdfs_t *)_file;
FUNC_ENTER_STATIC_NOERR
#if HDFS_DEBUG
HDfprintf(stdout, "called %s.\n", FUNC);
#endif
file->eoa = addr;
FUNC_LEAVE_NOAPI(SUCCEED)
}
static haddr_t
H5FD__hdfs_get_eof(const H5FD_t *_file, H5FD_mem_t H5_ATTR_UNUSED type)
{
const H5FD_hdfs_t *file = (const H5FD_hdfs_t *)_file;
FUNC_ENTER_STATIC_NOERR
#if HDFS_DEBUG
HDfprintf(stdout, "called %s.\n", FUNC);
#endif
HDassert(file->hdfs_handle != NULL);
HDassert(file->hdfs_handle->magic == HDFS_HDFST_MAGIC);
FUNC_LEAVE_NOAPI((size_t) file->hdfs_handle->fileinfo->mSize)
}
static herr_t
H5FD__hdfs_get_handle(H5FD_t *_file, hid_t H5_ATTR_UNUSED fapl, void **file_handle)
{
herr_t ret_value = SUCCEED;
H5FD_hdfs_t *file = (H5FD_hdfs_t *)_file;
FUNC_ENTER_STATIC
#if HDFS_DEBUG
HDfprintf(stdout, "called %s.\n", FUNC);
#endif
if(!file_handle)
HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "file handle not valid")
*file_handle = file->hdfs_handle;
done:
FUNC_LEAVE_NOAPI(ret_value)
}
static herr_t
H5FD__hdfs_read(H5FD_t *_file, H5FD_mem_t H5_ATTR_UNUSED type,
hid_t H5_ATTR_UNUSED dxpl_id, haddr_t addr, size_t size, void *buf)
{
herr_t ret_value = SUCCEED;
H5FD_hdfs_t *file = (H5FD_hdfs_t *)_file;
size_t filesize = 0;
#if HDFS_STATS
hdfs_statsbin *bin = NULL;
unsigned bin_i = 0;
#endif
FUNC_ENTER_STATIC
#if HDFS_DEBUG
HDfprintf(stdout, "called %s.\n", FUNC);
#endif
HDassert(file != NULL);
HDassert(file->hdfs_handle != NULL);
HDassert(file->hdfs_handle->magic == HDFS_HDFST_MAGIC);
HDassert(buf != NULL);
filesize = (size_t) file->hdfs_handle->fileinfo->mSize;
if((addr > filesize) || ((addr + size) > filesize))
HGOTO_ERROR(H5E_ARGS, H5E_OVERFLOW, FAIL, "range exceeds file address")
if(FAIL == hdfsPread(file->hdfs_handle->filesystem, file->hdfs_handle->file,
(tOffset)addr, buf, (tSize)size))
HGOTO_ERROR(H5E_VFL, H5E_READERROR, FAIL, "unable to execute read")
#if HDFS_STATS
for(bin_i = 0; bin_i < HDFS_STATS_BIN_COUNT; bin_i++)
if((unsigned long long)size < hdfs_stats_boundaries[bin_i])
break;
bin = (type == H5FD_MEM_DRAW)
? &file->raw[bin_i]
: &file->meta[bin_i];
if(bin->count == 0) {
bin->min = size;
bin->max = size;
}
else {
if(size < bin->min)
bin->min = size;
if(size > bin->max)
bin->max = size;
}
bin->count++;
bin->bytes += (unsigned long long)size;
#endif
done:
FUNC_LEAVE_NOAPI(ret_value)
}
static herr_t
H5FD__hdfs_write(H5FD_t H5_ATTR_UNUSED *_file, H5FD_mem_t H5_ATTR_UNUSED type,
hid_t H5_ATTR_UNUSED dxpl_id, haddr_t H5_ATTR_UNUSED addr, size_t H5_ATTR_UNUSED size,
const void H5_ATTR_UNUSED *buf)
{
herr_t ret_value = FAIL;
FUNC_ENTER_STATIC
#if HDFS_DEBUG
HDfprintf(stdout, "called %s.\n", FUNC);
#endif
HGOTO_ERROR(H5E_VFL, H5E_UNSUPPORTED, FAIL, "cannot write to read-only file")
done:
FUNC_LEAVE_NOAPI(ret_value)
}
static herr_t
H5FD__hdfs_truncate(H5FD_t H5_ATTR_UNUSED *_file, hid_t H5_ATTR_UNUSED dxpl_id,
hbool_t H5_ATTR_UNUSED closing)
{
herr_t ret_value = SUCCEED;
FUNC_ENTER_STATIC
#if HDFS_DEBUG
HDfprintf(stdout, "called %s.\n", FUNC);
#endif
HGOTO_ERROR(H5E_VFL, H5E_UNSUPPORTED, FAIL, "cannot truncate read-only file")
done:
FUNC_LEAVE_NOAPI(ret_value)
}
static herr_t
H5FD__hdfs_lock(
H5FD_t H5_ATTR_UNUSED *_file,
hbool_t H5_ATTR_UNUSED rw)
{
FUNC_ENTER_STATIC_NOERR
FUNC_LEAVE_NOAPI(SUCCEED)
}
static herr_t
H5FD__hdfs_unlock(H5FD_t H5_ATTR_UNUSED *_file)
{
FUNC_ENTER_STATIC_NOERR
FUNC_LEAVE_NOAPI(SUCCEED)
}
#else
hid_t
H5FD_hdfs_init(void)
{
FUNC_ENTER_NOAPI_NOINIT_NOERR
FUNC_LEAVE_NOAPI(H5I_INVALID_HID)
}
herr_t
H5Pget_fapl_hdfs(hid_t fapl_id, H5FD_hdfs_fapl_t *fa_out)
{
herr_t ret_value = FAIL;
FUNC_ENTER_API_NOINIT
H5TRACE2("e", "i*x", fapl_id, fa_out);
HGOTO_ERROR(H5E_VFL, H5E_UNSUPPORTED, FAIL, "HDFS VFD not included in the HDF5 library")
done:
FUNC_LEAVE_API_NOINIT(ret_value)
}
herr_t
H5Pset_fapl_hdfs(hid_t fapl_id, H5FD_hdfs_fapl_t *fa)
{
herr_t ret_value = FAIL;
FUNC_ENTER_API_NOINIT
H5TRACE2("e", "i*x", fapl_id, fa);
HGOTO_ERROR(H5E_VFL, H5E_UNSUPPORTED, FAIL, "HDFS VFD not included in the HDF5 library")
done:
FUNC_LEAVE_API_NOINIT(ret_value)
}
#endif