#ifndef UCP_EP_H_
#define UCP_EP_H_
#include "ucp_types.h"
#include <ucp/proto/lane_type.h>
#include <ucp/proto/proto_select.h>
#include <ucp/wireup/ep_match.h>
#include <ucp/api/ucp.h>
#include <uct/api/uct.h>
#include <uct/api/v2/uct_v2.h>
#include <ucs/datastruct/queue.h>
#include <ucs/datastruct/ptr_map.h>
#include <ucs/datastruct/strided_alloc.h>
#include <ucs/debug/assert.h>
#include <ucs/stats/stats.h>
#define UCP_MAX_IOV 16UL
#if ENABLE_DEBUG_DATA || UCS_ENABLE_ASSERT
typedef uint32_t ucp_ep_flags_t;
#else
typedef uint16_t ucp_ep_flags_t;
#endif
#if UCS_ENABLE_ASSERT
#define UCP_EP_ASSERT_COUNTER_INC(_counter) \
do { \
ucs_assert(*(_counter) < UINT_MAX); \
++(*(_counter)); \
} while (0)
#define UCP_EP_ASSERT_COUNTER_DEC(_counter) \
do { \
ucs_assert(*(_counter) > 0); \
--(*(_counter)); \
} while (0)
#else
#define UCP_EP_ASSERT_COUNTER_INC(_counter)
#define UCP_EP_ASSERT_COUNTER_DEC(_counter)
#endif
#define UCP_SA_DATA_HEADER_VERSION_SHIFT 5
enum {
UCP_EP_FLAG_LOCAL_CONNECTED = UCS_BIT(0),
UCP_EP_FLAG_REMOTE_CONNECTED = UCS_BIT(1),
UCP_EP_FLAG_CONNECT_REQ_QUEUED = UCS_BIT(2),
UCP_EP_FLAG_FAILED = UCS_BIT(3),
UCP_EP_FLAG_USED = UCS_BIT(4),
UCP_EP_FLAG_STREAM_HAS_DATA = UCS_BIT(5),
UCP_EP_FLAG_ON_MATCH_CTX = UCS_BIT(6),
UCP_EP_FLAG_REMOTE_ID = UCS_BIT(7),
UCP_EP_FLAG_CONNECT_PRE_REQ_QUEUED = UCS_BIT(9),
UCP_EP_FLAG_CLOSED = UCS_BIT(10),
UCP_EP_FLAG_CLOSE_REQ_VALID = UCS_BIT(11),
UCP_EP_FLAG_ERR_HANDLER_INVOKED = UCS_BIT(12),
UCP_EP_FLAG_INTERNAL = UCS_BIT(13),
UCP_EP_FLAG_INDIRECT_ID = UCS_BIT(14),
UCP_EP_FLAG_CONNECT_REQ_SENT = UCS_BIT(16),
UCP_EP_FLAG_CONNECT_REP_SENT = UCS_BIT(17),
UCP_EP_FLAG_CONNECT_ACK_SENT = UCS_BIT(18),
UCP_EP_FLAG_CONNECT_REQ_IGNORED = UCS_BIT(19),
UCP_EP_FLAG_CONNECT_PRE_REQ_SENT = UCS_BIT(20),
UCP_EP_FLAG_FLUSH_STATE_VALID = UCS_BIT(21),
UCP_EP_FLAG_DISCONNECTED_CM_LANE = UCS_BIT(22),
UCP_EP_FLAG_CLIENT_CONNECT_CB = UCS_BIT(23),
UCP_EP_FLAG_SERVER_NOTIFY_CB = UCS_BIT(24),
UCP_EP_FLAG_DISCONNECT_CB_CALLED = UCS_BIT(25)
};
enum {
UCP_EP_STAT_TAG_TX_EAGER,
UCP_EP_STAT_TAG_TX_EAGER_SYNC,
UCP_EP_STAT_TAG_TX_RNDV,
UCP_EP_STAT_LAST
};
enum {
UCP_EP_INIT_FLAG_MEM_TYPE = UCS_BIT(0),
UCP_EP_INIT_CREATE_AM_LANE = UCS_BIT(1),
UCP_EP_INIT_CM_WIREUP_CLIENT = UCS_BIT(2),
UCP_EP_INIT_CM_WIREUP_SERVER = UCS_BIT(3),
UCP_EP_INIT_ERR_MODE_PEER_FAILURE = UCS_BIT(4),
UCP_EP_INIT_CM_PHASE = UCS_BIT(5),
UCP_EP_INIT_FLAG_INTERNAL = UCS_BIT(6),
UCP_EP_INIT_CONNECT_TO_IFACE_ONLY = UCS_BIT(7),
UCP_EP_INIT_CREATE_AM_LANE_ONLY = UCS_BIT(8)
};
#define UCP_EP_STAT_TAG_OP(_ep, _op) \
UCS_STATS_UPDATE_COUNTER((_ep)->stats, UCP_EP_STAT_TAG_TX_##_op, 1);
typedef struct ucp_ep_config_key_lane {
ucp_rsc_index_t rsc_index;
ucp_md_index_t dst_md_index;
ucs_sys_device_t dst_sys_dev;
uint8_t path_index;
ucp_lane_type_mask_t lane_types;
size_t seg_size;
} ucp_ep_config_key_lane_t;
struct ucp_ep_config_key {
ucp_lane_index_t num_lanes;
ucp_ep_config_key_lane_t lanes[UCP_MAX_LANES];
ucp_lane_index_t am_lane;
ucp_lane_index_t tag_lane;
ucp_lane_index_t wireup_msg_lane;
ucp_lane_index_t cm_lane;
ucp_lane_index_t rma_lanes[UCP_MAX_LANES];
ucp_lane_index_t rma_bw_lanes[UCP_MAX_LANES];
ucp_lane_index_t rkey_ptr_lane;
ucp_lane_index_t amo_lanes[UCP_MAX_LANES];
ucp_lane_index_t am_bw_lanes[UCP_MAX_LANES];
ucp_md_map_t rma_bw_md_map;
ucp_md_map_t reachable_md_map;
ucp_rsc_index_t *dst_md_cmpts;
ucp_lane_map_t ep_check_map;
ucp_err_handling_mode_t err_mode;
};
typedef struct ucp_ep_rma_config {
ssize_t max_put_short;
size_t max_put_bcopy;
size_t max_put_zcopy;
ssize_t max_get_short;
size_t max_get_bcopy;
size_t max_get_zcopy;
size_t put_zcopy_thresh;
size_t get_zcopy_thresh;
} ucp_ep_rma_config_t;
typedef struct ucp_ep_msg_config {
ssize_t max_short;
size_t max_bcopy;
size_t max_zcopy;
size_t max_hdr;
size_t max_iov;
size_t zcopy_thresh[UCP_MAX_IOV];
size_t mem_type_zcopy_thresh[UCS_MEMORY_TYPE_LAST];
size_t sync_zcopy_thresh[UCP_MAX_IOV];
uint8_t zcopy_auto_thresh;
} ucp_ep_msg_config_t;
typedef struct ucp_memtype_thresh {
ssize_t memtype_on;
ssize_t memtype_off;
} ucp_memtype_thresh_t;
typedef struct ucp_rndv_thresh {
size_t remote;
size_t local;
} ucp_rndv_thresh_t;
typedef struct ucp_rndv_zcopy {
size_t max;
size_t min;
int split;
ucp_lane_index_t lanes[UCP_MAX_LANES];
double scale[UCP_MAX_LANES];
} ucp_ep_rndv_zcopy_config_t;
struct ucp_ep_config {
ucp_ep_config_key_t key;
ucp_lane_map_t p2p_lanes;
ucp_ep_rma_config_t rma[UCP_MAX_LANES];
size_t bcopy_thresh;
ucp_ep_msg_config_t am;
ucp_md_index_t md_index[UCP_MAX_LANES];
struct {
ucp_ep_rndv_zcopy_config_t get_zcopy;
ucp_ep_rndv_zcopy_config_t put_zcopy;
ucp_rndv_thresh_t rma_thresh;
ucp_rndv_thresh_t am_thresh;
size_t rkey_size;
ucp_md_map_t rkey_ptr_dst_mds;
} rndv;
struct {
const ucp_request_send_proto_t *proto;
const ucp_request_send_proto_t *sync_proto;
ucp_lane_index_t lane;
ucp_memtype_thresh_t max_eager_short;
ucp_ep_msg_config_t eager;
struct {
ucp_rndv_thresh_t rma_thresh;
ucp_rndv_thresh_t am_thresh;
} rndv;
struct {
ucp_memtype_thresh_t max_eager_short;
size_t max_rndv_iov;
size_t max_rndv_zcopy;
} offload;
} tag;
struct {
const ucp_request_send_proto_t *proto;
} stream;
struct {
const ucp_request_send_proto_t *proto;
const ucp_request_send_proto_t *reply_proto;
ucp_memtype_thresh_t max_eager_short;
ucp_memtype_thresh_t max_reply_eager_short;
} am_u;
ucp_proto_select_t proto_select;
};
typedef struct ucp_ep {
ucp_worker_h worker;
uint8_t refcount;
ucp_worker_cfg_index_t cfg_index;
ucp_ep_match_conn_sn_t conn_sn;
ucp_lane_index_t am_lane;
ucp_ep_flags_t flags;
uct_ep_h uct_eps[UCP_MAX_LANES];
#if ENABLE_DEBUG_DATA
char peer_name[UCP_WORKER_ADDRESS_NAME_MAX];
char name[UCP_ENTITY_NAME_MAX];
#endif
#if UCS_ENABLE_ASSERT
unsigned flush_iter_refcount;
unsigned discard_refcount;
#endif
UCS_STATS_NODE_DECLARE(stats)
} ucp_ep_t;
typedef struct {
ucs_hlist_head_t reqs;
uint32_t send_sn;
uint32_t cmpl_sn;
} ucp_ep_flush_state_t;
typedef struct {
ucp_request_t *req;
} ucp_ep_close_proto_req_t;
typedef struct {
ucp_rsc_index_t cm_idx;
ucs_ptr_map_key_t local_ep_id;
ucs_ptr_map_key_t remote_ep_id;
ucp_err_handler_cb_t err_cb;
ucp_ep_close_proto_req_t close_req;
#if UCS_ENABLE_ASSERT
ucs_time_t ka_last_round;
#endif
} ucp_ep_ext_control_t;
typedef struct {
void *user_data;
ucs_list_link_t ep_list;
union {
ucp_ep_match_elem_t ep_match;
ucp_ep_flush_state_t flush_state;
};
ucp_ep_ext_control_t *control_ext;
ucs_hlist_head_t proto_reqs;
} ucp_ep_ext_gen_t;
typedef struct {
struct {
ucs_list_link_t ready_list;
ucs_queue_head_t match_q;
} stream;
struct {
ucs_list_link_t started_ams;
ucs_queue_head_t mid_rdesc_q;
} am;
} ucp_ep_ext_proto_t;
enum {
UCP_WIREUP_SA_DATA_CM_ADDR = UCS_BIT(1)
};
enum {
UCP_SA_DATA_FLAG_ERR_MODE_PEER = UCS_BIT(0)
};
typedef struct ucp_wireup_sockaddr_data_base {
uint64_t ep_id;
uint8_t header;
} UCS_S_PACKED ucp_wireup_sockaddr_data_base_t;
typedef struct ucp_wireup_sockaddr_data_v1 {
ucp_wireup_sockaddr_data_base_t super;
uint8_t addr_mode;
uint8_t dev_index;
} UCS_S_PACKED ucp_wireup_sockaddr_data_v1_t;
typedef struct ucp_conn_request {
ucp_listener_h listener;
uct_listener_h uct_listener;
uct_conn_request_h uct_req;
ucp_rsc_index_t cm_idx;
char dev_name[UCT_DEVICE_NAME_MAX];
uct_device_addr_t *remote_dev_addr;
struct sockaddr_storage client_address;
ucp_ep_h ep;
} ucp_conn_request_t;
typedef struct ucp_ep_discard_lanes_arg {
unsigned counter;
ucs_status_t status;
ucp_ep_h ucp_ep;
} ucp_ep_discard_lanes_arg_t;
int ucp_is_uct_ep_failed(uct_ep_h uct_ep);
void ucp_ep_config_key_reset(ucp_ep_config_key_t *key);
void ucp_ep_config_cm_lane_info_str(ucp_worker_h worker,
const ucp_ep_config_key_t *key,
ucp_lane_index_t lane,
ucp_rsc_index_t cm_index,
ucs_string_buffer_t *buf);
void ucp_ep_config_lane_info_str(ucp_worker_h worker,
const ucp_ep_config_key_t *key,
const unsigned *addr_indices,
ucp_lane_index_t lane,
ucp_rsc_index_t aux_rsc_index,
ucs_string_buffer_t *buf);
ucs_status_t ucp_ep_create_base(ucp_worker_h worker, const char *peer_name,
const char *message, ucp_ep_h *ep_p);
void ucp_ep_add_ref(ucp_ep_h ep);
int ucp_ep_remove_ref(ucp_ep_h ep);
ucs_status_t ucp_worker_create_ep(ucp_worker_h worker, unsigned ep_init_flags,
const char *peer_name, const char *message,
ucp_ep_h *ep_p);
void ucp_ep_delete(ucp_ep_h ep);
void ucp_ep_release_id(ucp_ep_h ep);
ucs_status_t ucp_ep_init_create_wireup(ucp_ep_h ep, unsigned ep_init_flags,
ucp_wireup_ep_t **wireup_ep);
ucs_status_t
ucp_ep_create_to_worker_addr(ucp_worker_h worker,
const ucp_tl_bitmap_t *local_tl_bitmap,
const ucp_unpacked_address_t *remote_address,
unsigned ep_init_flags, const char *message,
ucp_ep_h *ep_p);
ucs_status_t ucp_ep_create_server_accept(ucp_worker_h worker,
const ucp_conn_request_h conn_request,
ucp_ep_h *ep_p);
ucs_status_ptr_t ucp_ep_flush_internal(ucp_ep_h ep, unsigned req_flags,
const ucp_request_param_t *param,
ucp_request_t *worker_req,
ucp_request_callback_t flushed_cb,
const char *debug_name);
ucs_status_t
ucp_ep_create_sockaddr_aux(ucp_worker_h worker, unsigned ep_init_flags,
const ucp_unpacked_address_t *remote_address,
ucp_ep_h *ep_p);
void ucp_ep_config_key_set_err_mode(ucp_ep_config_key_t *key,
unsigned ep_init_flags);
void ucp_ep_err_pending_purge(uct_pending_req_t *self, void *arg);
void ucp_destroyed_ep_pending_purge(uct_pending_req_t *self, void *arg);
void ucp_ep_disconnected(ucp_ep_h ep, int force);
void ucp_ep_destroy_internal(ucp_ep_h ep);
ucs_status_t
ucp_ep_set_failed(ucp_ep_h ucp_ep, ucp_lane_index_t lane, ucs_status_t status);
void ucp_ep_set_failed_schedule(ucp_ep_h ucp_ep, ucp_lane_index_t lane,
ucs_status_t status);
void ucp_ep_unprogress_uct_ep(ucp_ep_h ep, uct_ep_h uct_ep,
ucp_rsc_index_t rsc_index);
void ucp_ep_cleanup_lanes(ucp_ep_h ep);
ucs_status_t ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config,
const ucp_ep_config_key_t *key);
void ucp_ep_config_cleanup(ucp_worker_h worker, ucp_ep_config_t *config);
int ucp_ep_config_lane_is_peer_match(const ucp_ep_config_key_t *key1,
ucp_lane_index_t lane1,
const ucp_ep_config_key_t *key2,
ucp_lane_index_t lane2);
void ucp_ep_config_lanes_intersect(const ucp_ep_config_key_t *key1,
const ucp_rsc_index_t *dst_rsc_indices1,
const ucp_ep_config_key_t *key2,
const ucp_rsc_index_t *dst_rsc_indices2,
ucp_lane_index_t *lane_map);
int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1,
const ucp_ep_config_key_t *key2);
int ucp_ep_config_get_multi_lane_prio(const ucp_lane_index_t *lanes,
ucp_lane_index_t lane);
size_t ucp_ep_config_get_zcopy_auto_thresh(size_t iovcnt,
const ucs_linear_func_t *reg_cost,
const ucp_context_h context,
double bandwidth);
ucs_status_t ucp_worker_mem_type_eps_create(ucp_worker_h worker);
void ucp_worker_mem_type_eps_destroy(ucp_worker_h worker);
void ucp_worker_mem_type_eps_print_info(ucp_worker_h worker,
FILE *stream);
ucp_wireup_ep_t * ucp_ep_get_cm_wireup_ep(ucp_ep_h ep);
void ucp_ep_get_tl_bitmap(ucp_ep_h ep, ucp_tl_bitmap_t *tl_bitmap);
uct_ep_h ucp_ep_get_cm_uct_ep(ucp_ep_h ep);
int ucp_ep_is_cm_local_connected(ucp_ep_h ep);
int ucp_ep_is_local_connected(ucp_ep_h ep);
unsigned ucp_ep_local_disconnect_progress(void *arg);
size_t ucp_ep_tag_offload_min_rndv_thresh(ucp_ep_config_t *config);
void ucp_ep_config_rndv_zcopy_commit(ucp_lane_index_t lanes_count,
ucp_ep_rndv_zcopy_config_t *rndv_zcopy);
void ucp_ep_get_lane_info_str(ucp_ep_h ucp_ep, ucp_lane_index_t lane,
ucs_string_buffer_t *lane_info_strb);
void ucp_ep_config_rndv_zcopy_commit(ucp_lane_index_t lanes_count,
ucp_ep_rndv_zcopy_config_t *rndv_zcopy);
void ucp_ep_invoke_err_cb(ucp_ep_h ep, ucs_status_t status);
ucs_status_t ucp_ep_flush_progress_pending(uct_pending_req_t *self);
void ucp_ep_flush_completion(uct_completion_t *self);
void ucp_ep_flush_request_ff(ucp_request_t *req, ucs_status_t status);
void
ucp_ep_purge_lanes(ucp_ep_h ep, uct_pending_purge_callback_t purge_cb,
void *purge_arg);
unsigned ucp_ep_discard_lanes(ucp_ep_h ep, ucs_status_t discard_status,
ucp_send_nbx_callback_t discard_cb,
ucp_ep_discard_lanes_arg_t *discard_arg);
void ucp_ep_register_disconnect_progress(ucp_request_t *req);
ucp_lane_index_t ucp_ep_lookup_lane(ucp_ep_h ucp_ep, uct_ep_h uct_ep);
ucs_status_t ucp_ep_do_uct_ep_keepalive(ucp_ep_h ucp_ep, uct_ep_h uct_ep,
ucp_rsc_index_t rsc_idx, unsigned flags,
uct_completion_t *comp);
int ucp_ep_do_keepalive(ucp_ep_h ep, ucs_time_t now);
void ucp_ep_req_purge(ucp_ep_h ucp_ep, ucp_request_t *req,
ucs_status_t status, int recursive);
void ucp_ep_reqs_purge(ucp_ep_h ucp_ep, ucs_status_t status);
void ucp_ep_vfs_init(ucp_ep_h ep);
#endif