#ifndef UCP_PROTO_MULTI_INL_
#define UCP_PROTO_MULTI_INL_
#include "proto_multi.h"
#include <ucp/proto/proto_common.inl>
static UCS_F_ALWAYS_INLINE void
ucp_proto_multi_set_send_lane(ucp_request_t *req)
{
#if ENABLE_ASSERT
req->send.lane = UCP_NULL_LANE;
#endif
}
static UCS_F_ALWAYS_INLINE void
ucp_proto_multi_request_init(ucp_request_t *req)
{
req->send.multi_lane_idx = 0;
ucp_proto_multi_set_send_lane(req);
}
static UCS_F_ALWAYS_INLINE uint32_t
ucs_proto_multi_calc_weight(double lane_weight, double total_weight)
{
return (uint32_t)(
((lane_weight * UCP_PROTO_MULTI_WEIGHT_MAX) / total_weight) + 0.5);
}
static UCS_F_ALWAYS_INLINE size_t
ucp_proto_multi_scaled_length(uint32_t weight, size_t length)
{
return ((weight * length) + UCP_PROTO_MULTI_WEIGHT_MAX - 1) >>
UCP_PROTO_MULTI_WEIGHT_SHIFT;
}
static UCS_F_ALWAYS_INLINE size_t
ucp_proto_multi_max_payload(ucp_request_t *req,
const ucp_proto_multi_lane_priv_t *lpriv,
size_t hdr_size)
{
size_t length = req->send.state.dt_iter.length;
size_t max_frag = lpriv->max_frag - hdr_size;
size_t max_payload;
if (length < UCP_MIN_BCOPY) {
return max_frag;
}
max_payload = ucs_min(ucp_proto_multi_scaled_length(lpriv->weight, length),
max_frag);
ucs_assertv(max_payload > 0,
"length=%zu weight=%zu%% lpriv->max_frag=%zu hdr_size=%zu",
req->send.state.dt_iter.length,
ucp_proto_multi_scaled_length(lpriv->weight, 100),
lpriv->max_frag, hdr_size);
return max_payload;
}
static size_t UCS_F_ALWAYS_INLINE
ucp_proto_multi_data_pack(ucp_proto_multi_pack_ctx_t *pack_ctx, void *dest)
{
ucp_request_t *req = pack_ctx->req;
return ucp_datatype_iter_next_pack(&req->send.state.dt_iter,
req->send.ep->worker,
pack_ctx->max_payload,
pack_ctx->next_iter, dest);
}
static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_proto_multi_no_resource(ucp_request_t *req, ucp_lane_index_t lane)
{
ucs_status_t status;
uct_ep_h uct_ep;
if (lane == req->send.lane) {
return UCS_ERR_NO_RESOURCE;
}
uct_ep = req->send.ep->uct_eps[lane];
status = uct_ep_pending_add(uct_ep, &req->send.uct, 0);
if (status == UCS_ERR_BUSY) {
return UCS_INPROGRESS;
}
ucs_assert(status == UCS_OK);
req->send.lane = lane;
return UCS_OK;
}
static UCS_F_ALWAYS_INLINE ucs_status_t ucp_proto_multi_handle_send_error(
ucp_request_t *req, ucp_lane_index_t lane, ucs_status_t status)
{
if (ucs_likely(status == UCS_ERR_NO_RESOURCE)) {
return ucp_proto_multi_no_resource(req, lane);
}
ucp_proto_request_abort(req, status);
return UCS_OK;
}
static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_proto_multi_progress(ucp_request_t *req,
const ucp_proto_multi_priv_t *mpriv,
ucp_proto_send_multi_cb_t send_func,
ucp_proto_complete_cb_t complete_func,
unsigned dt_mask)
{
const ucp_proto_multi_lane_priv_t *lpriv;
ucp_datatype_iter_t next_iter;
ucp_lane_index_t lane_idx;
ucs_status_t status;
ucs_assertv(req->send.multi_lane_idx < mpriv->num_lanes,
"lane_idx=%d num_lanes=%d", req->send.multi_lane_idx,
mpriv->num_lanes);
lane_idx = req->send.multi_lane_idx;
lpriv = &mpriv->lanes[lane_idx];
status = send_func(req, lpriv, &next_iter);
if (ucs_likely(status == UCS_OK)) {
} else if (status == UCS_INPROGRESS) {
++req->send.state.uct_comp.count;
} else {
return ucp_proto_multi_handle_send_error(req, lpriv->super.lane,
status);
}
ucp_datatype_iter_copy_position(&req->send.state.dt_iter, &next_iter,
dt_mask);
if (ucp_datatype_iter_is_end(&req->send.state.dt_iter)) {
return complete_func(req);
}
lane_idx = req->send.multi_lane_idx + 1;
if (lane_idx >= mpriv->num_lanes) {
lane_idx = 0;
}
req->send.multi_lane_idx = lane_idx;
return UCS_INPROGRESS;
}
static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_proto_multi_bcopy_progress(ucp_request_t *req,
const ucp_proto_multi_priv_t *mpriv,
ucp_proto_init_cb_t init_func,
ucp_proto_send_multi_cb_t send_func,
ucp_proto_complete_cb_t comp_func)
{
if (!(req->flags & UCP_REQUEST_FLAG_PROTO_INITIALIZED)) {
ucp_proto_multi_request_init(req);
if (init_func != NULL) {
init_func(req);
}
req->flags |= UCP_REQUEST_FLAG_PROTO_INITIALIZED;
}
return ucp_proto_multi_progress(req, mpriv, send_func, comp_func, UINT_MAX);
}
static UCS_F_ALWAYS_INLINE ucs_status_t ucp_proto_multi_zcopy_progress(
ucp_request_t *req, const ucp_proto_multi_priv_t *mpriv,
ucp_proto_init_cb_t init_func, unsigned uct_mem_flags, unsigned dt_mask,
ucp_proto_send_multi_cb_t send_func,
ucp_proto_complete_cb_t complete_func,
uct_completion_callback_t uct_comp_cb)
{
ucs_status_t status;
if (!(req->flags & UCP_REQUEST_FLAG_PROTO_INITIALIZED)) {
status = ucp_proto_request_zcopy_init(req, mpriv->reg_md_map,
uct_comp_cb, uct_mem_flags,
dt_mask);
if (status != UCS_OK) {
ucp_proto_request_abort(req, status);
return UCS_OK;
}
ucp_proto_multi_request_init(req);
if (init_func != NULL) {
init_func(req);
}
req->flags |= UCP_REQUEST_FLAG_PROTO_INITIALIZED;
}
return ucp_proto_multi_progress(req, mpriv, send_func, complete_func,
dt_mask);
}
static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_proto_multi_lane_map_progress(ucp_request_t *req, ucp_lane_map_t *lane_map,
ucp_proto_multi_lane_send_func_t send_func)
{
ucp_lane_index_t lane = ucs_ffs32(*lane_map);
ucs_status_t status;
ucs_assert(*lane_map != 0);
status = send_func(req, lane);
if (ucs_likely(status == UCS_OK)) {
} else if (status == UCS_INPROGRESS) {
++req->send.state.uct_comp.count;
} else {
return ucp_proto_multi_handle_send_error(req, lane, status);
}
*lane_map &= ~UCS_BIT(lane);
if (*lane_map != 0) {
return UCS_INPROGRESS;
}
ucp_request_invoke_uct_completion_success(req);
return UCS_OK;
}
#endif