#ifndef UCT_UD_EP_H
#define UCT_UD_EP_H
#include "ud_def.h"
#include <uct/api/uct.h>
#include <ucs/datastruct/frag_list.h>
#include <ucs/datastruct/queue.h>
#include <ucs/datastruct/arbiter.h>
#include <ucs/datastruct/sglib.h>
#include <ucs/datastruct/conn_match.h>
#include <ucs/time/timer_wheel.h>
#define UCT_UD_EP_NULL_ID ((1<<24)-1)
#define UCT_UD_EP_ID_MAX UCT_UD_EP_NULL_ID
#define UCT_UD_EP_CONN_ID_MAX UCT_UD_EP_ID_MAX
typedef uint32_t uct_ud_ep_conn_sn_t;
#if UCT_UD_EP_DEBUG_HOOKS
typedef ucs_status_t (*uct_ud_ep_hook_t)(uct_ud_ep_t *ep, uct_ud_neth_t *neth);
#define UCT_UD_EP_HOOK_DECLARE(name) uct_ud_ep_hook_t name;
#define UCT_UD_EP_HOOK_CALL_RX(ep, neth, len) \
if ((ep)->rx.rx_hook(ep, neth) != UCS_OK) { \
ucs_trace_data("RX: dropping packet"); \
return; \
}
#define UCT_UD_EP_HOOK_CALL_TX(ep, neth) (ep)->tx.tx_hook(ep, neth);
#define UCT_UD_EP_HOOK_CALL_TIMER(ep) (ep)->timer_hook(ep, NULL);
static inline ucs_status_t uct_ud_ep_null_hook(uct_ud_ep_t *ep, uct_ud_neth_t *neth)
{
return UCS_OK;
}
#define UCT_UD_EP_HOOK_INIT(ep) \
do { \
(ep)->tx.tx_hook = uct_ud_ep_null_hook; \
(ep)->rx.rx_hook = uct_ud_ep_null_hook; \
(ep)->timer_hook = uct_ud_ep_null_hook; \
} while(0);
#else
#define UCT_UD_EP_HOOK_DECLARE(name)
#define UCT_UD_EP_HOOK_CALL_RX(ep, neth, len)
#define UCT_UD_EP_HOOK_CALL_TX(ep, neth)
#define UCT_UD_EP_HOOK_CALL_TIMER(ep)
#define UCT_UD_EP_HOOK_INIT(ep)
#endif
enum {
UCT_UD_EP_OP_NONE = 0,
UCT_UD_EP_OP_ACK = UCS_BIT(0),
UCT_UD_EP_OP_ACK_REQ = UCS_BIT(1),
UCT_UD_EP_OP_RESEND = UCS_BIT(2),
UCT_UD_EP_OP_CREP = UCS_BIT(3),
UCT_UD_EP_OP_CREQ = UCS_BIT(4),
UCT_UD_EP_OP_NACK = UCS_BIT(5),
};
#define UCT_UD_EP_OP_CTL_LOW_PRIO (UCT_UD_EP_OP_ACK_REQ|UCT_UD_EP_OP_ACK)
#define UCT_UD_EP_OP_CTL_HI_PRIO (UCT_UD_EP_OP_CREQ|UCT_UD_EP_OP_CREP|UCT_UD_EP_OP_RESEND)
#define UCT_UD_EP_OP_CTL_ACK (UCT_UD_EP_OP_ACK|UCT_UD_EP_OP_ACK_REQ|UCT_UD_EP_OP_NACK)
typedef struct uct_ud_ep_pending_op {
ucs_arbiter_group_t group;
uint32_t ops;
ucs_arbiter_elem_t elem;
} uct_ud_ep_pending_op_t;
enum {
UCT_UD_EP_STAT_TODO
};
enum {
UCT_UD_EP_FLAG_DISCONNECTED = UCS_BIT(0),
UCT_UD_EP_FLAG_PRIVATE = UCS_BIT(1),
UCT_UD_EP_FLAG_HAS_PENDING = UCS_BIT(2),
UCT_UD_EP_FLAG_CONNECTED = UCS_BIT(3),
UCT_UD_EP_FLAG_ON_CEP = UCS_BIT(4),
UCT_UD_EP_FLAG_CREQ_RCVD = UCS_BIT(5),
UCT_UD_EP_FLAG_CREP_RCVD = UCS_BIT(6),
UCT_UD_EP_FLAG_CREQ_SENT = UCS_BIT(7),
UCT_UD_EP_FLAG_CREP_SENT = UCS_BIT(8),
UCT_UD_EP_FLAG_CREQ_NOTSENT = UCS_BIT(9),
UCT_UD_EP_FLAG_TX_NACKED = UCS_BIT(10),
#if UCS_ENABLE_ASSERT
UCT_UD_EP_FLAG_IN_PENDING = UCS_BIT(11)
#else
UCT_UD_EP_FLAG_IN_PENDING = 0
#endif
};
typedef struct uct_ud_peer_name {
char name[16];
int pid;
} uct_ud_peer_name_t;
struct uct_ud_ep {
uct_base_ep_t super;
uint32_t ep_id;
uint32_t dest_ep_id;
struct {
uct_ud_psn_t psn;
uct_ud_psn_t max_psn;
uct_ud_psn_t acked_psn;
uint16_t resend_count;
ucs_queue_head_t window;
uct_ud_ep_pending_op_t pending;
ucs_time_t send_time;
ucs_time_t resend_time;
ucs_time_t tick;
UCS_STATS_NODE_DECLARE(stats)
UCT_UD_EP_HOOK_DECLARE(tx_hook)
} tx;
struct {
uct_ud_psn_t acked_psn;
ucs_frag_list_t ooo_pkts;
UCS_STATS_NODE_DECLARE(stats)
UCT_UD_EP_HOOK_DECLARE(rx_hook)
} rx;
struct {
uct_ud_psn_t wmax;
uct_ud_psn_t cwnd;
} ca;
struct UCS_S_PACKED {
ucs_queue_iter_t pos;
uct_ud_psn_t psn;
uct_ud_psn_t max_psn;
} resend;
ucs_conn_match_elem_t conn_match;
uct_ud_ep_conn_sn_t conn_sn;
uint16_t flags;
uint8_t rx_creq_count;
uint8_t path_index;
ucs_wtimer_t timer;
ucs_time_t close_time;
UCS_STATS_NODE_DECLARE(stats)
UCT_UD_EP_HOOK_DECLARE(timer_hook)
#if ENABLE_DEBUG_DATA
uct_ud_peer_name_t peer;
#endif
};
#if ENABLE_DEBUG_DATA
# define UCT_UD_EP_PEER_NAME_FMT "%s:%d"
# define UCT_UD_EP_PEER_NAME_ARG(_ep) (_ep)->peer.name, (_ep)->peer.pid
#else
# define UCT_UD_EP_PEER_NAME_FMT "%s"
# define UCT_UD_EP_PEER_NAME_ARG(_ep) "<no debug data>"
#endif
UCS_CLASS_DECLARE(uct_ud_ep_t, uct_ud_iface_t*, const uct_ep_params_t*)
typedef struct {
uct_pending_req_priv_arb_t arb;
unsigned flags;
} uct_ud_pending_req_priv_t;
static UCS_F_ALWAYS_INLINE uct_ud_pending_req_priv_t *
uct_ud_pending_req_priv(uct_pending_req_t *req)
{
return (uct_ud_pending_req_priv_t *)&(req)->priv;
}
void uct_ud_tx_wnd_purge_outstanding(uct_ud_iface_t *iface, uct_ud_ep_t *ud_ep,
ucs_status_t status, int is_async);
ucs_status_t uct_ud_ep_flush(uct_ep_h ep, unsigned flags,
uct_completion_t *comp);
ucs_status_t uct_ud_ep_flush_nolock(uct_ud_iface_t *iface, uct_ud_ep_t *ep,
uct_completion_t *comp);
ucs_status_t uct_ud_ep_check(uct_ep_h tl_ep, unsigned flags, uct_completion_t *comp);
ucs_status_t uct_ud_ep_get_address(uct_ep_h tl_ep, uct_ep_addr_t *addr);
ucs_status_t uct_ud_ep_create_connected_common(const uct_ep_params_t *params,
uct_ep_h *new_ep_p);
ucs_status_t uct_ud_ep_connect_to_ep(uct_ep_h tl_ep,
const uct_device_addr_t *dev_addr,
const uct_ep_addr_t *uct_ep_addr);
ucs_status_t uct_ud_ep_pending_add(uct_ep_h ep, uct_pending_req_t *n,
unsigned flags);
void uct_ud_ep_pending_purge(uct_ep_h ep, uct_pending_purge_callback_t cb,
void *arg);
void uct_ud_ep_disconnect(uct_ep_h ep);
void uct_ud_ep_window_release_completed(uct_ud_ep_t *ep, int is_async);
uct_ud_send_skb_t *uct_ud_ep_prepare_creq(uct_ud_ep_t *ep);
ucs_arbiter_cb_result_t
uct_ud_ep_do_pending(ucs_arbiter_t *arbiter, ucs_arbiter_group_t *group,
ucs_arbiter_elem_t *elem, void *arg);
void uct_ud_ep_clone(uct_ud_ep_t *old_ep, uct_ud_ep_t *new_ep);
static UCS_F_ALWAYS_INLINE void
uct_ud_neth_set_type_am(uct_ud_ep_t *ep, uct_ud_neth_t *neth, uint8_t id)
{
neth->packet_type = (id << UCT_UD_PACKET_AM_ID_SHIFT) |
ep->dest_ep_id |
UCT_UD_PACKET_FLAG_AM;
}
static UCS_F_ALWAYS_INLINE void
uct_ud_neth_set_type_put(uct_ud_ep_t *ep, uct_ud_neth_t *neth)
{
neth->packet_type = ep->dest_ep_id | UCT_UD_PACKET_FLAG_PUT;
}
void uct_ud_ep_process_rx(uct_ud_iface_t *iface,
uct_ud_neth_t *neth, unsigned byte_len,
uct_ud_recv_skb_t *skb, int is_async);
static UCS_F_ALWAYS_INLINE void
uct_ud_neth_init_data(uct_ud_ep_t *ep, uct_ud_neth_t *neth)
{
neth->psn = ep->tx.psn;
neth->ack_psn = ep->rx.acked_psn = ucs_frag_list_sn(&ep->rx.ooo_pkts);
}
static UCS_F_ALWAYS_INLINE void
uct_ud_ep_ctl_op_del(uct_ud_ep_t *ep, uint32_t ops)
{
ep->tx.pending.ops &= ~ops;
}
static UCS_F_ALWAYS_INLINE int
uct_ud_ep_ctl_op_check(uct_ud_ep_t *ep, uint32_t op)
{
return ep->tx.pending.ops & op;
}
static UCS_F_ALWAYS_INLINE int
uct_ud_ep_ctl_op_isany(uct_ud_ep_t *ep)
{
return ep->tx.pending.ops;
}
static UCS_F_ALWAYS_INLINE int
uct_ud_ep_ctl_op_check_ex(uct_ud_ep_t *ep, uint32_t ops)
{
return (ep->tx.pending.ops & ops) &&
((ep->tx.pending.ops & ~ops) == 0);
}
static UCS_F_ALWAYS_INLINE int uct_ud_ep_is_connected(uct_ud_ep_t *ep)
{
ucs_assert((ep->dest_ep_id == UCT_UD_EP_NULL_ID) ==
!(ep->flags & UCT_UD_EP_FLAG_CONNECTED));
return ep->flags & UCT_UD_EP_FLAG_CONNECTED;
}
static UCS_F_ALWAYS_INLINE int
uct_ud_ep_is_connected_and_no_pending(uct_ud_ep_t *ep)
{
return (ep->flags & (UCT_UD_EP_FLAG_CONNECTED |
UCT_UD_EP_FLAG_HAS_PENDING))
== UCT_UD_EP_FLAG_CONNECTED;
}
static UCS_F_ALWAYS_INLINE int uct_ud_ep_no_window(uct_ud_ep_t *ep)
{
return UCT_UD_PSN_COMPARE(ep->tx.psn, >=, ep->tx.max_psn);
}
static UCS_F_ALWAYS_INLINE int uct_ud_ep_req_ack(uct_ud_ep_t *ep)
{
uct_ud_psn_t acked_psn, max_psn, psn;
max_psn = ep->tx.max_psn;
acked_psn = ep->tx.acked_psn;
psn = ep->tx.psn;
return UCT_UD_PSN_COMPARE(psn, ==, ((acked_psn * 3 + max_psn) >> 2)) ||
UCT_UD_PSN_COMPARE(psn + 1, ==, max_psn) ||
uct_ud_ep_ctl_op_check(ep, UCT_UD_EP_OP_ACK_REQ);
}
static UCS_F_ALWAYS_INLINE void
uct_ud_neth_ack_req(uct_ud_ep_t *ep, uct_ud_neth_t *neth)
{
neth->packet_type |= uct_ud_ep_req_ack(ep) << UCT_UD_PACKET_ACK_REQ_SHIFT;
uct_ud_ep_ctl_op_del(ep, UCT_UD_EP_OP_ACK|UCT_UD_EP_OP_ACK_REQ);
}
#endif