#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include "tcp_sockcm_ep.h"
#include <ucs/async/async.h>
#include <ucs/sys/sock.h>
ucs_config_field_t uct_tcp_sockcm_config_table[] = {
{"CM_", "", NULL,
ucs_offsetof(uct_tcp_sockcm_config_t, super), UCS_CONFIG_TYPE_TABLE(uct_cm_config_table)},
{"PRIV_DATA_LEN", "2048",
"TCP CM private data length",
ucs_offsetof(uct_tcp_sockcm_config_t, priv_data_len), UCS_CONFIG_TYPE_MEMUNITS},
UCT_TCP_SEND_RECV_BUF_FIELDS(ucs_offsetof(uct_tcp_sockcm_config_t, sockopt)),
UCT_TCP_SYN_CNT(ucs_offsetof(uct_tcp_sockcm_config_t, syn_cnt)),
{NULL}
};
static ucs_status_t uct_tcp_sockcm_query(uct_cm_h cm, uct_cm_attr_t *cm_attr)
{
uct_tcp_sockcm_t *tcp_sockcm = ucs_derived_of(cm, uct_tcp_sockcm_t);
if (cm_attr->field_mask & UCT_CM_ATTR_FIELD_MAX_CONN_PRIV) {
cm_attr->max_conn_priv = tcp_sockcm->priv_data_len;
}
return UCS_OK;
}
static uct_cm_ops_t uct_tcp_sockcm_ops = {
.close = UCS_CLASS_DELETE_FUNC_NAME(uct_tcp_sockcm_t),
.cm_query = uct_tcp_sockcm_query,
.listener_create = UCS_CLASS_NEW_FUNC_NAME(uct_tcp_listener_t),
.listener_reject = uct_tcp_listener_reject,
.listener_query = uct_tcp_listener_query,
.listener_destroy = UCS_CLASS_DELETE_FUNC_NAME(uct_tcp_listener_t),
.ep_create = uct_tcp_sockcm_ep_create
};
static ucs_status_t uct_tcp_sockcm_event_err_to_ucs_err_log(int fd,
ucs_log_level_t* log_level)
{
int error = 0;
ucs_status_t status;
status = ucs_socket_getopt(fd, SOL_SOCKET, SO_ERROR, (void*)&error, sizeof(error));
if (status != UCS_OK) {
goto err;
}
ucs_debug("error event on fd %d: %s", fd, strerror(error));
switch (error) {
case ECONNREFUSED:
*log_level = UCS_LOG_LEVEL_DEBUG;
return UCS_ERR_NOT_CONNECTED;
case EPIPE:
case ECONNRESET:
*log_level = UCS_LOG_LEVEL_DEBUG;
return UCS_ERR_CONNECTION_RESET;
case ENETUNREACH:
case EHOSTUNREACH:
case ETIMEDOUT:
*log_level = UCS_LOG_LEVEL_DIAG;
return UCS_ERR_UNREACHABLE;
default:
goto err;
}
err:
*log_level = UCS_LOG_LEVEL_ERROR;
ucs_error("error event on fd %d: %s", fd, strerror(error));
return UCS_ERR_IO_ERROR;
}
void uct_tcp_sa_data_handler(int fd, ucs_event_set_types_t events, void *arg)
{
uct_tcp_sockcm_ep_t *ep = (uct_tcp_sockcm_ep_t*)arg;
ucs_log_level_t log_level;
ucs_status_t status;
ucs_assertv(ep->fd == fd, "ep->fd %d fd %d, ep_state %d", ep->fd, fd, ep->state);
ucs_trace("ep %p on %s received event 0x%x (state = %d)", ep,
(ep->state & UCT_TCP_SOCKCM_EP_ON_SERVER) ? "server" : "client",
events, ep->state);
if (events & UCS_EVENT_SET_EVERR) {
status = uct_tcp_sockcm_event_err_to_ucs_err_log(fd, &log_level);
ucs_log(log_level, "error event on %s ep %p (status=%s state=%d) events=%d",
(ep->state & UCT_TCP_SOCKCM_EP_ON_SERVER) ? "server" : "client",
ep, ucs_status_string(status), ep->state, events);
uct_tcp_sockcm_ep_handle_event_status(ep, status, events, "event set error");
return;
}
if (events & UCS_EVENT_SET_EVREAD) {
status = uct_tcp_sockcm_ep_recv(ep);
if (status != UCS_OK) {
uct_tcp_sockcm_ep_handle_event_status(ep, status, events, "failed to receive");
return;
}
} else if (events & UCS_EVENT_SET_EVWRITE) {
status = uct_tcp_sockcm_ep_send(ep);
if (status != UCS_OK) {
uct_tcp_sockcm_ep_handle_event_status(ep, status, events, "failed to send");
return;
}
}
}
ucs_status_t uct_tcp_sockcm_ep_query(uct_ep_h ep, uct_ep_attr_t *ep_attr)
{
uct_tcp_sockcm_ep_t *cep = ucs_derived_of(ep, uct_tcp_sockcm_ep_t);
ucs_status_t status;
socklen_t local_addr_len;
socklen_t remote_addr_len;
status = ucs_socket_getname(cep->fd, &ep_attr->local_address, &local_addr_len);
if (status != UCS_OK) {
return status;
}
status = ucs_socket_getpeername(cep->fd, &ep_attr->remote_address, &remote_addr_len);
if (status != UCS_OK) {
return status;
}
return UCS_OK;
}
static uct_iface_ops_t uct_tcp_sockcm_iface_ops = {
.ep_pending_purge = (uct_ep_pending_purge_func_t)ucs_empty_function,
.ep_connect = uct_tcp_sockcm_ep_connect,
.ep_disconnect = uct_tcp_sockcm_ep_disconnect,
.cm_ep_conn_notify = uct_tcp_sockcm_cm_ep_conn_notify,
.ep_destroy = UCS_CLASS_DELETE_FUNC_NAME(uct_tcp_sockcm_ep_t),
.ep_put_short = (uct_ep_put_short_func_t)ucs_empty_function_return_unsupported,
.ep_put_bcopy = (uct_ep_put_bcopy_func_t)ucs_empty_function_return_unsupported,
.ep_get_bcopy = (uct_ep_get_bcopy_func_t)ucs_empty_function_return_unsupported,
.ep_am_short = (uct_ep_am_short_func_t)ucs_empty_function_return_unsupported,
.ep_am_short_iov = (uct_ep_am_short_iov_func_t)ucs_empty_function_return_unsupported,
.ep_am_bcopy = (uct_ep_am_bcopy_func_t)ucs_empty_function_return_unsupported,
.ep_atomic_cswap64 = (uct_ep_atomic_cswap64_func_t)ucs_empty_function_return_unsupported,
.ep_atomic64_post = (uct_ep_atomic64_post_func_t)ucs_empty_function_return_unsupported,
.ep_atomic64_fetch = (uct_ep_atomic64_fetch_func_t)ucs_empty_function_return_unsupported,
.ep_atomic_cswap32 = (uct_ep_atomic_cswap32_func_t)ucs_empty_function_return_unsupported,
.ep_atomic32_post = (uct_ep_atomic32_post_func_t)ucs_empty_function_return_unsupported,
.ep_atomic32_fetch = (uct_ep_atomic32_fetch_func_t)ucs_empty_function_return_unsupported,
.ep_pending_add = (uct_ep_pending_add_func_t)ucs_empty_function_return_unsupported,
.ep_flush = (uct_ep_flush_func_t)ucs_empty_function_return_success,
.ep_fence = (uct_ep_fence_func_t)ucs_empty_function_return_unsupported,
.ep_check = (uct_ep_check_func_t)ucs_empty_function_return_unsupported,
.ep_create = (uct_ep_create_func_t)ucs_empty_function_return_unsupported,
.iface_flush = (uct_iface_flush_func_t)ucs_empty_function_return_unsupported,
.iface_fence = (uct_iface_fence_func_t)ucs_empty_function_return_unsupported,
.iface_progress_enable = ucs_empty_function,
.iface_progress_disable = ucs_empty_function,
.iface_progress = (uct_iface_progress_func_t)ucs_empty_function_return_zero,
.iface_event_fd_get = (uct_iface_event_fd_get_func_t)ucs_empty_function_return_unsupported,
.iface_event_arm = (uct_iface_event_arm_func_t)ucs_empty_function_return_unsupported,
.iface_close = ucs_empty_function,
.iface_query = (uct_iface_query_func_t)ucs_empty_function_return_unsupported,
.iface_get_device_address = (uct_iface_get_device_address_func_t)ucs_empty_function_return_unsupported,
.iface_get_address = (uct_iface_get_address_func_t)ucs_empty_function_return_unsupported,
.iface_is_reachable = (uct_iface_is_reachable_func_t)ucs_empty_function_return_zero
};
static uct_iface_internal_ops_t uct_tcp_sockcm_iface_internal_ops = {
.iface_estimate_perf = (uct_iface_estimate_perf_func_t)ucs_empty_function_return_unsupported,
.iface_vfs_refresh = (uct_iface_vfs_refresh_func_t)ucs_empty_function,
.ep_query = uct_tcp_sockcm_ep_query,
};
UCS_CLASS_INIT_FUNC(uct_tcp_sockcm_t, uct_component_h component,
uct_worker_h worker, const uct_cm_config_t *config)
{
uct_tcp_sockcm_config_t *cm_config = ucs_derived_of(config,
uct_tcp_sockcm_config_t);
UCS_CLASS_CALL_SUPER_INIT(uct_cm_t, &uct_tcp_sockcm_ops,
&uct_tcp_sockcm_iface_ops,
&uct_tcp_sockcm_iface_internal_ops,
worker, component, config);
self->priv_data_len = cm_config->priv_data_len +
sizeof(uct_tcp_sockcm_priv_data_hdr_t);
self->sockopt_sndbuf = cm_config->sockopt.sndbuf;
self->sockopt_rcvbuf = cm_config->sockopt.rcvbuf;
self->syn_cnt = cm_config->syn_cnt;
ucs_list_head_init(&self->ep_list);
ucs_debug("created tcp_sockcm %p", self);
return UCS_OK;
}
UCS_CLASS_CLEANUP_FUNC(uct_tcp_sockcm_t)
{
uct_tcp_sockcm_ep_t *ep, *tmp;
UCS_ASYNC_BLOCK(self->super.iface.worker->async);
ucs_list_for_each_safe(ep, tmp, &self->ep_list, list) {
uct_tcp_sockcm_close_ep(ep);
}
UCS_ASYNC_UNBLOCK(self->super.iface.worker->async);
}
UCS_CLASS_DEFINE(uct_tcp_sockcm_t, uct_cm_t);
UCS_CLASS_DEFINE_NEW_FUNC(uct_tcp_sockcm_t, uct_cm_t, uct_component_h,
uct_worker_h, const uct_cm_config_t *);
UCS_CLASS_DEFINE_DELETE_FUNC(uct_tcp_sockcm_t, uct_cm_t);