#ifndef UCP_WORKER_INL_
#define UCP_WORKER_INL_
#include "ucp_worker.h"
#include "ucp_rkey.inl"
#include <ucp/core/ucp_request.h>
#include <ucp/wireup/address.h>
#include <ucs/datastruct/ptr_map.inl>
UCS_PTR_MAP_IMPL(ep, 1);
KHASH_IMPL(ucp_worker_rkey_config, ucp_rkey_config_key_t,
ucp_worker_cfg_index_t, 1, ucp_rkey_config_hash_func,
ucp_rkey_config_is_equal);
static UCS_F_ALWAYS_INLINE ucs_status_t ucp_worker_rkey_config_get(
ucp_worker_h worker, const ucp_rkey_config_key_t *key,
const ucs_sys_dev_distance_t *lanes_distance,
ucp_worker_cfg_index_t *cfg_index_p)
{
khiter_t khiter = kh_get(ucp_worker_rkey_config, &worker->rkey_config_hash,
*key);
if (ucs_likely(khiter != kh_end(&worker->rkey_config_hash))) {
*cfg_index_p = kh_val(&worker->rkey_config_hash, khiter);
return UCS_OK;
}
return ucp_worker_add_rkey_config(worker, key, lanes_distance, cfg_index_p);
}
static UCS_F_ALWAYS_INLINE khint_t
ucp_worker_mpool_hash_func(ucp_worker_mpool_key_t mpool_key)
{
return (khint_t)mpool_key.mem_type ^ (mpool_key.sys_dev << 8);
}
static UCS_F_ALWAYS_INLINE int
ucp_worker_mpool_key_is_equal(ucp_worker_mpool_key_t mpool_key1,
ucp_worker_mpool_key_t mpool_key2)
{
return (mpool_key1.sys_dev == mpool_key2.sys_dev) &&
(mpool_key1.mem_type == mpool_key2.mem_type);
}
KHASH_IMPL(ucp_worker_mpool_hash, ucp_worker_mpool_key_t, ucs_mpool_t,
1, ucp_worker_mpool_hash_func, ucp_worker_mpool_key_is_equal);
static UCS_F_ALWAYS_INLINE const char*
ucp_worker_get_address_name(ucp_worker_h worker)
{
return worker->address_name;
}
static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_worker_get_ep_by_id(ucp_worker_h worker, ucs_ptr_map_key_t id,
ucp_ep_h *ep_p)
{
ucs_status_t status;
void *ptr;
ucs_assert(id != UCS_PTR_MAP_KEY_INVALID);
status = UCS_PTR_MAP_GET(ep, &worker->ep_map, id, 0, &ptr);
if (ucs_unlikely((status != UCS_OK) && (status != UCS_ERR_NO_PROGRESS))) {
return status;
}
*ep_p = (ucp_ep_h)ptr;
ucs_assertv((*ep_p)->worker == worker, "worker=%p ep=%p ep->worker=%p",
worker, (*ep_p), (*ep_p)->worker);
return UCS_OK;
}
static UCS_F_ALWAYS_INLINE int
ucp_worker_keepalive_is_enabled(ucp_worker_h worker)
{
return (worker->context->config.ext.keepalive_num_eps != 0) &&
(worker->context->config.ext.keepalive_interval != UCS_TIME_INFINITY);
}
static UCS_F_ALWAYS_INLINE ucp_worker_iface_t*
ucp_worker_iface(ucp_worker_h worker, ucp_rsc_index_t rsc_index)
{
ucp_tl_bitmap_t tl_bitmap;
if (rsc_index == UCP_NULL_RESOURCE) {
return NULL;
}
tl_bitmap = worker->context->tl_bitmap;
ucs_assert(UCS_BITMAP_GET(tl_bitmap, rsc_index));
return worker->ifaces[UCS_BITMAP_POPCOUNT_UPTO_INDEX(tl_bitmap, rsc_index)];
}
static UCS_F_ALWAYS_INLINE uct_iface_attr_t*
ucp_worker_iface_get_attr(ucp_worker_h worker, ucp_rsc_index_t rsc_index)
{
return &ucp_worker_iface(worker, rsc_index)->attr;
}
static UCS_F_ALWAYS_INLINE double
ucp_worker_iface_bandwidth(ucp_worker_h worker, ucp_rsc_index_t rsc_index)
{
uct_iface_attr_t *iface_attr = ucp_worker_iface_get_attr(worker, rsc_index);
return ucp_tl_iface_bandwidth(worker->context, &iface_attr->bandwidth);
}
static UCS_F_ALWAYS_INLINE int
ucp_worker_is_unified_mode(ucp_worker_h worker)
{
return worker->context->config.ext.unified_mode;
}
static UCS_F_ALWAYS_INLINE ucp_rsc_index_t
ucp_worker_num_cm_cmpts(const ucp_worker_h worker)
{
return worker->context->config.num_cm_cmpts;
}
static UCS_F_ALWAYS_INLINE int
ucp_worker_iface_is_tl_p2p(const uct_iface_attr_t *iface_attr)
{
return !!(iface_attr->cap.flags & UCT_IFACE_FLAG_CONNECT_TO_EP);
}
static UCS_F_ALWAYS_INLINE int
ucp_worker_is_tl_p2p(ucp_worker_h worker, ucp_rsc_index_t rsc_index)
{
return ucp_worker_iface_is_tl_p2p(ucp_worker_iface_get_attr(worker,
rsc_index));
}
static UCS_F_ALWAYS_INLINE int
ucp_worker_is_tl_2iface(ucp_worker_h worker, ucp_rsc_index_t rsc_index)
{
return !!(ucp_worker_iface_get_attr(worker, rsc_index)->cap.flags &
UCT_IFACE_FLAG_CONNECT_TO_IFACE);
}
static UCS_F_ALWAYS_INLINE UCS_F_MAYBE_UNUSED int
ucp_worker_is_tl_2sockaddr(ucp_worker_h worker, ucp_rsc_index_t rsc_index)
{
return !!(ucp_worker_iface_get_attr(worker, rsc_index)->cap.flags &
UCT_IFACE_FLAG_CONNECT_TO_SOCKADDR);
}
static UCS_F_ALWAYS_INLINE unsigned
ucp_worker_common_address_pack_flags(ucp_worker_h worker)
{
unsigned pack_flags = 0;
if (worker->context->num_mem_type_detect_mds > 0) {
pack_flags |= UCP_ADDRESS_PACK_FLAG_SYS_DEVICE;
}
return pack_flags;
}
static UCS_F_ALWAYS_INLINE unsigned
ucp_worker_default_address_pack_flags(ucp_worker_h worker)
{
return ucp_worker_common_address_pack_flags(worker) |
UCP_ADDRESS_PACK_FLAG_WORKER_UUID |
UCP_ADDRESS_PACK_FLAG_WORKER_NAME |
UCP_ADDRESS_PACK_FLAG_DEVICE_ADDR |
UCP_ADDRESS_PACK_FLAG_IFACE_ADDR | UCP_ADDRESS_PACK_FLAG_EP_ADDR;
}
#define UCP_WORKER_GET_EP_BY_ID(_ep_p, _worker, _ep_id, _action, _fmt_str, ...) \
{ \
ucs_status_t __status; \
\
__status = ucp_worker_get_ep_by_id(_worker, _ep_id, _ep_p); \
if (ucs_unlikely(__status != UCS_OK)) { \
ucs_trace_data("worker %p: ep id 0x%" PRIx64 \
" was not found, drop" _fmt_str, \
_worker, _ep_id, ##__VA_ARGS__); \
_action; \
} \
}
#define UCP_WORKER_GET_VALID_EP_BY_ID(_ep_p, _worker, _ep_id, _action, \
_fmt_str, ...) \
{ \
UCP_WORKER_GET_EP_BY_ID(_ep_p, _worker, _ep_id, _action, _fmt_str, \
##__VA_ARGS__); \
if (ucs_unlikely((*(_ep_p))->flags & UCP_EP_FLAG_CLOSED)) { \
ucs_trace_data("worker %p: ep id 0x%" PRIx64 " was already closed" \
" ep %p, drop " _fmt_str, \
_worker, _ep_id, *(_ep_p), ##__VA_ARGS__); \
_action; \
} \
}
#endif