#ifndef UCP_CONTEXT_H_
#define UCP_CONTEXT_H_
#include "ucp_types.h"
#include "ucp_thread.h"
#include <ucp/api/ucp.h>
#include <ucp/dt/dt.h>
#include <ucp/proto/proto.h>
#include <uct/api/uct.h>
#include <ucs/datastruct/mpool.h>
#include <ucs/datastruct/queue_types.h>
#include <ucs/datastruct/bitmap.h>
#include <ucs/datastruct/conn_match.h>
#include <ucs/memory/memtype_cache.h>
#include <ucs/memory/memory_type.h>
#include <ucs/type/spinlock.h>
#include <ucs/sys/string.h>
#include <ucs/type/param.h>
enum {
UCP_TL_RSC_FLAG_AUX = UCS_BIT(0),
UCP_TL_RSC_FLAG_SOCKADDR = UCS_BIT(1)
};
typedef struct ucp_context_config {
size_t bcopy_thresh;
size_t rndv_thresh;
size_t rndv_send_nbr_thresh;
size_t rndv_thresh_fallback;
double rndv_perf_diff;
double multi_lane_max_ratio;
size_t zcopy_thresh;
ucp_rndv_mode_t rndv_mode;
size_t rkey_ptr_seg_size;
double bcopy_bw;
size_t seg_size;
size_t rndv_frag_size[UCS_MEMORY_TYPE_LAST];
size_t rndv_num_frags[UCS_MEMORY_TYPE_LAST];
size_t rndv_pipeline_send_thresh;
size_t tm_thresh;
size_t tm_force_thresh;
size_t tm_max_bb_size;
int tm_sw_rndv;
int address_debug_info;
unsigned max_worker_address_name;
ucp_atomic_mode_t atomic_mode;
int use_mt_mutex;
int adaptive_progress;
unsigned max_eager_lanes;
unsigned max_rndv_lanes;
size_t estimated_num_eps;
size_t estimated_num_ppn;
int flush_worker_eps;
int unified_mode;
int cm_use_all_devices;
size_t listener_backlog;
int proto_enable;
ucs_time_t keepalive_interval;
unsigned keepalive_num_eps;
ucs_on_off_auto_value_t proto_indirect_id;
unsigned reg_whole_alloc_bitmap;
int rndv_put_force_flush;
size_t rndv_memtype_direct_size;
ucp_object_version_t sa_client_min_hdr_version;
int rkey_mpool_max_md;
ucp_object_version_t worker_addr_version;
} ucp_context_config_t;
typedef UCS_CONFIG_STRING_ARRAY_FIELD(names) ucp_context_config_names_t;
struct ucp_config {
ucs_config_names_array_t devices[UCT_DEVICE_TYPE_LAST];
ucs_config_allow_list_t tls;
ucs_config_allow_list_t protos;
UCS_CONFIG_STRING_ARRAY_FIELD(methods) alloc_prio;
ucp_context_config_names_t rndv_frag_sizes;
ucp_context_config_names_t rndv_frag_elems;
UCS_CONFIG_STRING_ARRAY_FIELD(aux_tls) sockaddr_aux_tls;
UCS_CONFIG_STRING_ARRAY_FIELD(cm_tls) sockaddr_cm_tls;
int warn_invalid_config;
char *env_prefix;
char *selection_cmp;
ucp_context_config_t ctx;
ucs_list_link_t cached_key_list;
UCS_CONFIG_ARRAY_FIELD(size_t, memunits) mpool_sizes;
};
typedef struct ucp_tl_resource_desc {
uct_tl_resource_desc_t tl_rsc;
uint16_t tl_name_csum;
ucp_md_index_t md_index;
ucp_rsc_index_t dev_index;
uint8_t flags;
} ucp_tl_resource_desc_t;
typedef struct ucp_tl_alias {
const char *alias;
const char* tls[8];
} ucp_tl_alias_t;
typedef struct ucp_tl_cmpt {
uct_component_h cmpt;
uct_component_attr_t attr;
} ucp_tl_cmpt_t;
typedef struct ucp_tl_md {
uct_md_h md;
ucp_rsc_index_t cmpt_index;
uct_md_resource_desc_t rsc;
uct_md_attr_t attr;
} ucp_tl_md_t;
typedef struct ucp_context {
ucp_tl_cmpt_t *tl_cmpts;
ucp_rsc_index_t num_cmpts;
ucp_tl_md_t *tl_mds;
ucp_md_index_t num_mds;
int alloc_md_map_initialized;
ucp_md_map_t alloc_md_map;
ucp_md_index_t mem_type_detect_mds[UCS_MEMORY_TYPE_LAST];
ucp_md_index_t num_mem_type_detect_mds;
uint64_t mem_type_mask;
ucp_tl_resource_desc_t *tl_rscs;
ucp_tl_bitmap_t tl_bitmap;
ucp_rsc_index_t num_tls;
ucp_tl_bitmap_t mem_type_access_tls[UCS_MEMORY_TYPE_LAST];
ucp_proto_id_mask_t proto_bitmap;
struct {
uint64_t features;
uint64_t tag_sender_mask;
int est_num_eps;
int est_num_ppn;
struct {
size_t size;
ucp_request_init_callback_t init;
ucp_request_cleanup_callback_t cleanup;
} request;
struct {
uct_alloc_method_t method;
char cmpt_name[UCT_COMPONENT_NAME_MAX];
} *alloc_methods;
unsigned num_alloc_methods;
ucp_tl_bitmap_t cm_cmpts_bitmap;
ucp_rsc_index_t cm_cmpt_idxs[UCP_MAX_RESOURCES];
ucp_rsc_index_t num_cm_cmpts;
ucp_context_config_t ext;
char *env_prefix;
char *selection_cmp;
struct {
unsigned count;
size_t *sizes;
} am_mpools;
} config;
ucp_mt_lock_t mt_lock;
char name[UCP_ENTITY_NAME_MAX];
ucs_list_link_t cached_key_list;
} ucp_context_t;
typedef struct ucp_am_handler {
uint64_t features;
uct_am_callback_t cb;
ucp_am_tracer_t tracer;
uint32_t flags;
uct_am_callback_t proxy_cb;
} ucp_am_handler_t;
typedef struct ucp_tl_iface_atomic_flags {
struct {
uint64_t op_flags;
uint64_t fop_flags;
} atomic32, atomic64;
} ucp_tl_iface_atomic_flags_t;
#define UCP_ATOMIC_OP_MASK (UCS_BIT(UCT_ATOMIC_OP_ADD) | \
UCS_BIT(UCT_ATOMIC_OP_AND) | \
UCS_BIT(UCT_ATOMIC_OP_OR) | \
UCS_BIT(UCT_ATOMIC_OP_XOR))
#define UCP_ATOMIC_FOP_MASK (UCS_BIT(UCT_ATOMIC_OP_ADD) | \
UCS_BIT(UCT_ATOMIC_OP_AND) | \
UCS_BIT(UCT_ATOMIC_OP_OR) | \
UCS_BIT(UCT_ATOMIC_OP_XOR) | \
UCS_BIT(UCT_ATOMIC_OP_SWAP) | \
UCS_BIT(UCT_ATOMIC_OP_CSWAP))
#define UCP_DEFINE_AM(_features, _id, _cb, _tracer, _flags) \
UCS_STATIC_INIT { \
ucp_am_handlers[_id].features = _features; \
ucp_am_handlers[_id].cb = _cb; \
ucp_am_handlers[_id].tracer = _tracer; \
ucp_am_handlers[_id].flags = _flags; \
}
#define UCP_DEFINE_AM_PROXY(_id) \
\
static ucs_status_t \
ucp_am_##_id##_counting_proxy(void *arg, void *data, size_t length, \
unsigned flags) \
{ \
ucp_worker_iface_t *wiface = arg; \
wiface->proxy_recv_count++; \
return ucp_am_handlers[_id].cb(wiface->worker, data, length, flags); \
} \
\
UCS_STATIC_INIT { \
ucp_am_handlers[_id].proxy_cb = ucp_am_##_id##_counting_proxy; \
}
#define UCP_CHECK_PARAM_NON_NULL(_param, _status, _action) \
if ((_param) == NULL) { \
ucs_error("the parameter %s must not be NULL", #_param); \
(_status) = UCS_ERR_INVALID_PARAM; \
_action; \
};
#define UCP_CONTEXT_CHECK_FEATURE_FLAGS(_context, _flags, _action) \
do { \
if (ENABLE_PARAMS_CHECK && \
ucs_unlikely(!((_context)->config.features & (_flags)))) { \
size_t feature_list_str_max = 512; \
char *feature_list_str = ucs_alloca(feature_list_str_max); \
ucs_error("feature flags %s were not set for ucp_init()", \
ucs_flags_str(feature_list_str, feature_list_str_max, \
(_flags) & ~(_context)->config.features, \
ucp_feature_str)); \
_action; \
} \
} while (0)
#define UCP_PARAM_VALUE(_obj, _params, _name, _flag, _default) \
UCS_PARAM_VALUE(UCS_PP_TOKENPASTE3(UCP_, _obj, _PARAM_FIELD), _params, \
_name, _flag, _default)
#define UCP_PARAM_FIELD_VALUE(_params, _name, _flag, _default) \
UCS_PARAM_VALUE(UCP_PARAM_FIELD, _params, _name, _flag, _default)
#define UCP_ATTR_VALUE(_obj, _attrs, _name, _flag, _default) \
UCS_PARAM_VALUE(UCS_PP_TOKENPASTE3(UCP_, _obj, _ATTR_FIELD), _attrs, \
_name, _flag, _default)
#define ucp_assert_memtype(_context, _buffer, _length, _mem_type) \
ucs_assert(ucp_memory_type_detect(_context, _buffer, _length) == (_mem_type))
extern ucp_am_handler_t ucp_am_handlers[];
extern const char *ucp_feature_str[];
extern const char *ucp_operation_names[];
void ucp_dump_payload(ucp_context_h context, char *buffer, size_t max,
const void *data, size_t length);
void ucp_context_tag_offload_enable(ucp_context_h context);
void ucp_context_uct_atomic_iface_flags(ucp_context_h context,
ucp_tl_iface_atomic_flags_t *atomic);
const char * ucp_find_tl_name_by_csum(ucp_context_t *context, uint16_t tl_name_csum);
const char *ucp_tl_bitmap_str(ucp_context_h context,
const ucp_tl_bitmap_t *tl_bitmap, char *str,
size_t max_str_len);
const char* ucp_feature_flags_str(unsigned feature_flags, char *str,
size_t max_str_len);
void ucp_memory_detect_slowpath(ucp_context_h context, const void *address,
size_t length, ucs_memory_info_t *mem_info);
static UCS_F_ALWAYS_INLINE
double ucp_calc_epsilon(double val1, double val2)
{
return (val1 + val2) * (1e-6);
}
static UCS_F_ALWAYS_INLINE
int ucp_score_cmp(double score1, double score2)
{
double diff = score1 - score2;
return ((fabs(diff) < ucp_calc_epsilon(score1, score2)) ?
0 : ucs_signum(diff));
}
static UCS_F_ALWAYS_INLINE
int ucp_score_prio_cmp(double score1, int prio1, double score2, int prio2)
{
int score_res = ucp_score_cmp(score1, score2);
return score_res ? score_res : ucs_signum(prio1 - prio2);
}
static UCS_F_ALWAYS_INLINE
int ucp_is_scalable_transport(ucp_context_h context, size_t max_num_eps)
{
return (max_num_eps >= (size_t)context->config.est_num_eps);
}
static UCS_F_ALWAYS_INLINE double
ucp_tl_iface_latency(ucp_context_h context, const ucs_linear_func_t *latency)
{
return ucs_linear_func_apply(*latency, context->config.est_num_eps);
}
static UCS_F_ALWAYS_INLINE double
ucp_tl_iface_bandwidth(ucp_context_h context, const uct_ppn_bandwidth_t *bandwidth)
{
return bandwidth->dedicated +
(bandwidth->shared / context->config.est_num_ppn);
}
static UCS_F_ALWAYS_INLINE void
ucp_memory_info_set_host(ucp_memory_info_t *mem_info)
{
mem_info->type = UCS_MEMORY_TYPE_HOST;
mem_info->sys_dev = UCS_SYS_DEVICE_ID_UNKNOWN;
}
static UCS_F_ALWAYS_INLINE void
ucp_memory_detect_internal(ucp_context_h context, const void *address,
size_t length, ucs_memory_info_t *mem_info)
{
ucs_status_t status;
if (ucs_likely(context->num_mem_type_detect_mds == 0)) {
goto out_host_mem;
}
status = ucs_memtype_cache_lookup(address, length, mem_info);
if (ucs_likely(status == UCS_ERR_NO_ELEM)) {
goto out_host_mem;
} else if ((status == UCS_ERR_UNSUPPORTED) ||
((status == UCS_OK) &&
((mem_info->type == UCS_MEMORY_TYPE_UNKNOWN) ||
(mem_info->sys_dev == UCS_SYS_DEVICE_ID_UNKNOWN)))) {
ucp_memory_detect_slowpath(context, address, length, mem_info);
} else {
ucs_assertv(status == UCS_OK, "%s (%d)", ucs_status_string(status),
status);
}
return;
out_host_mem:
ucs_memory_info_set_host(mem_info);
}
static UCS_F_ALWAYS_INLINE void
ucp_memory_detect(ucp_context_h context, const void *address, size_t length,
ucp_memory_info_t *mem_info)
{
ucs_memory_info_t mem_info_internal;
ucp_memory_detect_internal(context, address, length, &mem_info_internal);
mem_info->type = mem_info_internal.type;
mem_info->sys_dev = mem_info_internal.sys_dev;
}
void
ucp_context_dev_tl_bitmap(ucp_context_h context, const char *dev_name,
ucp_tl_bitmap_t *tl_bitmap);
void
ucp_context_dev_idx_tl_bitmap(ucp_context_h context, ucp_rsc_index_t dev_idx,
ucp_tl_bitmap_t *tl_bitmap);
void ucp_tl_bitmap_validate(const ucp_tl_bitmap_t *tl_bitmap,
const ucp_tl_bitmap_t *tl_bitmap_super);
const char* ucp_context_cm_name(ucp_context_h context, ucp_rsc_index_t cm_idx);
ucs_status_t
ucp_config_modify_internal(ucp_config_t *config, const char *name,
const char *value);
void ucp_apply_uct_config_list(ucp_context_h context, void *config);
#endif