#ifndef UCT_TCP_MD_H
#define UCT_TCP_MD_H
#include "tcp_base.h"
#include <uct/base/uct_md.h>
#include <uct/base/uct_iface.h>
#include <uct/base/uct_iov.inl>
#include <ucs/sys/sock.h>
#include <ucs/sys/string.h>
#include <ucs/datastruct/conn_match.h>
#include <ucs/datastruct/ptr_map.inl>
#include <ucs/algorithm/crc.h>
#include <ucs/sys/event_set.h>
#include <ucs/sys/iovec.h>
#include <net/if.h>
#define UCT_TCP_NAME "tcp"
#define UCT_TCP_CONFIG_PREFIX "TCP_"
#define UCT_TCP_MAGIC_NUMBER 0xCAFEBABE12345678lu
#define UCT_TCP_MAX_EVENTS 16
#define UCT_TCP_EP_CTX_CAPS_STR_MAX 8
#define UCT_TCP_EP_ZCOPY_SERVICE_IOV_COUNT 2
#define UCT_TCP_EP_AM_SHORTV_IOV_COUNT 3
#define UCT_TCP_EP_PUT_ZCOPY_MAX SIZE_MAX
#define UCT_TCP_EP_PUT_SERVICE_LENGTH (sizeof(uct_tcp_am_hdr_t) + \
sizeof(uct_tcp_ep_put_req_hdr_t))
#define UCT_TCP_CONFIG_MAX_CONN_RETRIES "MAX_CONN_RETRIES"
#define UCT_TCP_EP_CTX_CAPS (UCT_TCP_EP_FLAG_CTX_TYPE_TX | \
UCT_TCP_EP_FLAG_CTX_TYPE_RX)
#define UCT_TCP_CM_CONN_SN_MAX UINT64_MAX
#define UCT_TCP_EP_DEFAULT_KEEPALIVE_IDLE 10
#define UCT_TCP_EP_DEFAULT_KEEPALIVE_INTVL 2
typedef union uct_tcp_ep_cm_id {
ucs_conn_sn_t conn_sn;
ucs_ptr_map_key_t ptr_map_key;
} uct_tcp_ep_cm_id_t;
enum {
UCT_TCP_EP_FLAG_CTX_TYPE_TX = UCS_BIT(0),
UCT_TCP_EP_FLAG_CTX_TYPE_RX = UCS_BIT(1),
UCT_TCP_EP_FLAG_ZCOPY_TX = UCS_BIT(2),
UCT_TCP_EP_FLAG_PUT_RX = UCS_BIT(3),
UCT_TCP_EP_FLAG_PUT_TX_WAITING_ACK = UCS_BIT(4),
UCT_TCP_EP_FLAG_PUT_RX_SENDING_ACK = UCS_BIT(5),
UCT_TCP_EP_FLAG_ON_MATCH_CTX = UCS_BIT(6),
UCT_TCP_EP_FLAG_FAILED = UCS_BIT(7),
UCT_TCP_EP_FLAG_CONNECT_TO_EP = UCS_BIT(8),
UCT_TCP_EP_FLAG_ON_PTR_MAP = UCS_BIT(9),
UCT_TCP_EP_FLAG_NEED_FLUSH = UCS_BIT(10)
};
typedef enum uct_tcp_ep_conn_state {
UCT_TCP_EP_CONN_STATE_CLOSED,
UCT_TCP_EP_CONN_STATE_CONNECTING,
UCT_TCP_EP_CONN_STATE_RECV_MAGIC_NUMBER,
UCT_TCP_EP_CONN_STATE_ACCEPTING,
UCT_TCP_EP_CONN_STATE_WAITING_ACK,
UCT_TCP_EP_CONN_STATE_CONNECTED
} uct_tcp_ep_conn_state_t;
typedef struct uct_tcp_ep uct_tcp_ep_t;
typedef ucs_callback_t uct_tcp_ep_progress_t;
typedef struct uct_tcp_cm_state {
const char *name;
uct_tcp_ep_progress_t tx_progress;
uct_tcp_ep_progress_t rx_progress;
} uct_tcp_cm_state_t;
typedef enum uct_tcp_cm_conn_event {
UCT_TCP_CM_CONN_REQ = UCS_BIT(0),
UCT_TCP_CM_CONN_ACK = UCS_BIT(1),
UCT_TCP_CM_CONN_ACK_WITH_REQ = (UCT_TCP_CM_CONN_REQ |
UCT_TCP_CM_CONN_ACK)
} uct_tcp_cm_conn_event_t;
enum {
UCT_TCP_CM_CONN_REQ_PKT_FLAG_CONNECT_TO_EP = UCS_BIT(0)
};
typedef struct uct_tcp_cm_conn_req_pkt {
uct_tcp_cm_conn_event_t event;
uint8_t flags;
uct_tcp_ep_cm_id_t cm_id;
} UCS_S_PACKED uct_tcp_cm_conn_req_pkt_t;
typedef struct uct_tcp_am_hdr {
uint8_t am_id;
uint32_t length;
} UCS_S_PACKED uct_tcp_am_hdr_t;
typedef enum uct_tcp_ep_am_id {
UCT_TCP_EP_CM_AM_ID = UCT_AM_ID_MAX,
UCT_TCP_EP_PUT_REQ_AM_ID = UCT_AM_ID_MAX + 1,
UCT_TCP_EP_PUT_ACK_AM_ID = UCT_AM_ID_MAX + 2,
UCT_TCP_EP_KEEPALIVE_AM_ID = UCT_AM_ID_MAX + 3
} uct_tcp_ep_am_id_t;
typedef struct uct_tcp_ep_put_req_hdr {
uint64_t addr;
size_t length;
uint32_t sn;
} UCS_S_PACKED uct_tcp_ep_put_req_hdr_t;
typedef struct uct_tcp_ep_put_ack_hdr {
uint32_t sn;
} UCS_S_PACKED uct_tcp_ep_put_ack_hdr_t;
typedef struct uct_tcp_ep_put_completion {
uct_completion_t *comp;
uint32_t wait_put_sn;
ucs_queue_elem_t elem;
} uct_tcp_ep_put_completion_t;
typedef struct uct_tcp_ep_ctx {
uint32_t put_sn;
void *buf;
size_t length;
size_t offset;
} uct_tcp_ep_ctx_t;
typedef struct uct_tcp_ep_zcopy_tx {
uct_tcp_am_hdr_t super;
uct_completion_t *comp;
size_t iov_index;
size_t iov_cnt;
struct iovec iov[0];
} uct_tcp_ep_zcopy_tx_t;
typedef enum uct_tcp_device_addr_flags {
UCT_TCP_DEVICE_ADDR_FLAG_LOOPBACK = UCS_BIT(0)
} uct_tcp_device_addr_flags_t;
typedef struct uct_tcp_device_addr {
uint8_t flags;
uint8_t sa_family;
} UCS_S_PACKED uct_tcp_device_addr_t;
typedef struct uct_tcp_iface_addr {
uint16_t port;
} UCS_S_PACKED uct_tcp_iface_addr_t;
typedef struct uct_tcp_ep_addr {
uct_tcp_iface_addr_t iface_addr;
ucs_ptr_map_key_t ptr_map_key;
} UCS_S_PACKED uct_tcp_ep_addr_t;
struct uct_tcp_ep {
uct_base_ep_t super;
uint8_t conn_retries;
uint8_t conn_state;
ucs_event_set_types_t events;
uint16_t flags;
int fd;
int stale_fd;
uct_tcp_ep_cm_id_t cm_id;
uct_tcp_ep_ctx_t tx;
uct_tcp_ep_ctx_t rx;
ucs_queue_head_t pending_q;
ucs_queue_head_t put_comp_q;
union {
ucs_list_link_t list;
ucs_conn_match_elem_t elem;
};
char peer_addr[0];
};
UCS_PTR_MAP_DEFINE(tcp_ep, 0);
typedef struct uct_tcp_iface {
uct_base_iface_t super;
int listen_fd;
ucs_conn_match_ctx_t conn_match_ctx;
UCS_PTR_MAP_T(tcp_ep) ep_ptr_map;
ucs_list_link_t ep_list;
char if_name[IFNAMSIZ];
ucs_sys_event_set_t *event_set;
ucs_mpool_t tx_mpool;
ucs_mpool_t rx_mpool;
size_t outstanding;
ucs_range_spec_t port_range;
struct {
size_t tx_seg_size;
size_t rx_seg_size;
size_t sendv_thresh;
size_t max_iov;
struct {
size_t max_hdr;
size_t hdr_offset;
} zcopy;
struct sockaddr_storage ifaddr;
struct sockaddr_storage netmask;
size_t sockaddr_len;
int prefer_default;
int put_enable;
int conn_nb;
unsigned max_poll;
uint8_t max_conn_retries;
unsigned syn_cnt;
struct {
ucs_time_t idle;
unsigned long cnt;
ucs_time_t intvl;
} keepalive;
} config;
struct {
int nodelay;
size_t sndbuf;
size_t rcvbuf;
} sockopt;
} uct_tcp_iface_t;
typedef struct uct_tcp_iface_config {
uct_iface_config_t super;
size_t tx_seg_size;
size_t rx_seg_size;
size_t max_iov;
size_t sendv_thresh;
int prefer_default;
int put_enable;
int conn_nb;
unsigned max_poll;
unsigned max_conn_retries;
int sockopt_nodelay;
uct_tcp_send_recv_buf_config_t sockopt;
unsigned syn_cnt;
uct_iface_mpool_config_t tx_mpool;
uct_iface_mpool_config_t rx_mpool;
ucs_range_spec_t port_range;
struct {
ucs_time_t idle;
unsigned long cnt;
ucs_time_t intvl;
} keepalive;
} uct_tcp_iface_config_t;
typedef struct uct_tcp_md {
uct_md_t super;
struct {
int af_prio_count;
sa_family_t af_prio_list[2];
} config;
} uct_tcp_md_t;
typedef struct uct_tcp_md_config {
uct_md_config_t super;
UCS_CONFIG_STRING_ARRAY_FIELD(af) af_prio;
} uct_tcp_md_config_t;
typedef struct uct_tcp_ep_pending_req {
uct_pending_req_t super;
uct_tcp_ep_t *ep;
struct {
uct_tcp_cm_conn_event_t event;
uint8_t log_error;
} cm;
} uct_tcp_ep_pending_req_t;
typedef struct uct_tcp_ep_pending_purge_arg {
uct_pending_purge_callback_t cb;
void *arg;
} uct_tcp_ep_pending_purge_arg_t;
extern uct_component_t uct_tcp_component;
extern const char *uct_tcp_address_type_names[];
extern const uct_tcp_cm_state_t uct_tcp_ep_cm_state[];
extern const ucs_conn_match_ops_t uct_tcp_cm_conn_match_ops;
extern const uct_tcp_ep_progress_t uct_tcp_ep_progress_rx_cb[];
ucs_status_t uct_tcp_netif_caps(const char *if_name, double *latency_p,
double *bandwidth_p);
ucs_status_t uct_tcp_netif_is_default(const char *if_name, int *result_p);
int uct_tcp_sockaddr_cmp(const struct sockaddr *sa1,
const struct sockaddr *sa2);
ucs_status_t uct_tcp_iface_set_sockopt(uct_tcp_iface_t *iface, int fd,
int set_nb);
size_t uct_tcp_iface_get_max_iov(const uct_tcp_iface_t *iface);
size_t uct_tcp_iface_get_max_zcopy_header(const uct_tcp_iface_t *iface);
void uct_tcp_iface_add_ep(uct_tcp_ep_t *ep);
void uct_tcp_iface_remove_ep(uct_tcp_ep_t *ep);
int uct_tcp_cm_ep_accept_conn(uct_tcp_ep_t *ep);
int uct_tcp_iface_is_self_addr(uct_tcp_iface_t *iface,
const struct sockaddr *peer_addr);
ucs_status_t uct_tcp_ep_handle_io_err(uct_tcp_ep_t *ep, const char *op_str,
ucs_status_t io_status);
ucs_status_t uct_tcp_ep_init(uct_tcp_iface_t *iface, int fd,
const struct sockaddr *dest_addr,
uct_tcp_ep_t **ep_p);
ucs_status_t uct_tcp_ep_set_dest_addr(const uct_device_addr_t *dev_addr,
const uct_iface_addr_t *iface_addr,
struct sockaddr *dest_addr);
uint64_t uct_tcp_ep_get_cm_id(const uct_tcp_ep_t *ep);
ucs_status_t uct_tcp_ep_create(const uct_ep_params_t *params,
uct_ep_h *ep_p);
ucs_status_t uct_tcp_ep_get_address(uct_ep_h tl_ep, uct_ep_addr_t *ep_addr);
ucs_status_t uct_tcp_ep_connect_to_ep(uct_ep_h ep,
const uct_device_addr_t *dev_addr,
const uct_ep_addr_t *ep_addr);
const char *uct_tcp_ep_ctx_caps_str(uint8_t ep_ctx_caps, char *str_buffer);
void uct_tcp_ep_change_ctx_caps(uct_tcp_ep_t *ep, uint16_t new_caps);
void uct_tcp_ep_add_ctx_cap(uct_tcp_ep_t *ep, uint16_t cap);
void uct_tcp_ep_remove_ctx_cap(uct_tcp_ep_t *ep, uint16_t cap);
void uct_tcp_ep_move_ctx_cap(uct_tcp_ep_t *from_ep, uct_tcp_ep_t *to_ep,
uint16_t ctx_cap);
void uct_tcp_ep_destroy_internal(uct_ep_h tl_ep);
void uct_tcp_ep_destroy(uct_ep_h tl_ep);
void uct_tcp_ep_set_failed(uct_tcp_ep_t *ep);
void uct_tcp_ep_replace_ep(uct_tcp_ep_t *to_ep, uct_tcp_ep_t *from_ep);
int uct_tcp_ep_is_self(const uct_tcp_ep_t *ep);
uct_tcp_ep_t* uct_tcp_ep_ptr_map_retrieve(uct_tcp_iface_t *iface,
ucs_ptr_map_key_t ptr_map_key);
void uct_tcp_ep_remove(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep);
void uct_tcp_ep_add(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep);
void uct_tcp_ep_mod_events(uct_tcp_ep_t *ep, ucs_event_set_types_t add,
ucs_event_set_types_t rem);
void uct_tcp_ep_pending_queue_dispatch(uct_tcp_ep_t *ep);
ucs_status_t uct_tcp_ep_am_short(uct_ep_h uct_ep, uint8_t am_id, uint64_t header,
const void *payload, unsigned length);
ucs_status_t uct_tcp_ep_am_short_iov(uct_ep_h uct_ep, uint8_t am_id,
const uct_iov_t *iov, size_t iovcnt);
ssize_t uct_tcp_ep_am_bcopy(uct_ep_h uct_ep, uint8_t am_id,
uct_pack_callback_t pack_cb, void *arg,
unsigned flags);
ucs_status_t uct_tcp_ep_am_zcopy(uct_ep_h uct_ep, uint8_t am_id, const void *header,
unsigned header_length, const uct_iov_t *iov,
size_t iovcnt, unsigned flags,
uct_completion_t *comp);
ucs_status_t uct_tcp_ep_put_zcopy(uct_ep_h uct_ep, const uct_iov_t *iov,
size_t iovcnt, uint64_t remote_addr,
uct_rkey_t rkey, uct_completion_t *comp);
ucs_status_t uct_tcp_ep_pending_add(uct_ep_h tl_ep, uct_pending_req_t *req,
unsigned flags);
void uct_tcp_ep_pending_purge(uct_ep_h tl_ep, uct_pending_purge_callback_t cb,
void *arg);
ucs_status_t uct_tcp_ep_flush(uct_ep_h tl_ep, unsigned flags,
uct_completion_t *comp);
ucs_status_t
uct_tcp_ep_check(uct_ep_h tl_ep, unsigned flags, uct_completion_t *comp);
ucs_status_t uct_tcp_cm_send_event_pending_cb(uct_pending_req_t *self);
ucs_status_t uct_tcp_cm_send_event(uct_tcp_ep_t *ep,
uct_tcp_cm_conn_event_t event,
int log_error);
unsigned uct_tcp_cm_handle_conn_pkt(uct_tcp_ep_t **ep_p, void *pkt, uint32_t length);
unsigned uct_tcp_cm_conn_progress(void *arg);
uct_tcp_ep_conn_state_t
uct_tcp_cm_set_conn_state(uct_tcp_ep_t *ep,
uct_tcp_ep_conn_state_t new_conn_state);
void uct_tcp_cm_change_conn_state(uct_tcp_ep_t *ep,
uct_tcp_ep_conn_state_t new_conn_state);
void uct_tcp_cm_ep_set_conn_sn(uct_tcp_ep_t *ep);
uct_tcp_ep_t *uct_tcp_cm_get_ep(uct_tcp_iface_t *iface,
const struct sockaddr *dest_address,
ucs_conn_sn_t conn_sn, uint8_t with_ctx_cap);
void uct_tcp_cm_insert_ep(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep);
void uct_tcp_cm_remove_ep(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep);
ucs_status_t uct_tcp_cm_handle_incoming_conn(uct_tcp_iface_t *iface,
const struct sockaddr *peer_addr,
int fd);
ucs_status_t uct_tcp_cm_conn_start(uct_tcp_ep_t *ep);
int uct_tcp_keepalive_is_enabled(uct_tcp_iface_t *iface);
static UCS_F_ALWAYS_INLINE int uct_tcp_ep_ctx_buf_empty(uct_tcp_ep_ctx_t *ctx)
{
ucs_assert((ctx->length == 0) || (ctx->buf != NULL));
return ctx->length == 0;
}
static inline void uct_tcp_iface_outstanding_inc(uct_tcp_iface_t *iface)
{
iface->outstanding++;
}
static inline void uct_tcp_iface_outstanding_dec(uct_tcp_iface_t *iface)
{
ucs_assert(iface->outstanding > 0);
iface->outstanding--;
}
ucs_status_t uct_tcp_query_devices(uct_md_h md,
uct_tl_device_resource_t **devices_p,
unsigned *num_devices_p);
#endif