ucx1-sys 0.1.0

Rust FFI bindings to UCX.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
/**
* Copyright (C) Mellanox Technologies Ltd. 2001-2014.  ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/

#ifndef UCT_RC_IFACE_H
#define UCT_RC_IFACE_H

#include "rc_def.h"

#include <uct/base/uct_iface.h>
#include <uct/ib/base/ib_log.h>
#include <uct/ib/base/ib_iface.h>
#include <ucs/datastruct/arbiter.h>
#include <ucs/datastruct/queue.h>
#include <ucs/datastruct/ptr_array.h>
#include <ucs/debug/log.h>


#define UCT_RC_QP_TABLE_ORDER       12
#define UCT_RC_QP_TABLE_SIZE        UCS_BIT(UCT_RC_QP_TABLE_ORDER)
#define UCT_RC_QP_TABLE_MEMB_ORDER  (UCT_IB_QPN_ORDER - UCT_RC_QP_TABLE_ORDER)
#define UCT_RC_QP_MAX_RETRY_COUNT   7

#define UCT_RC_CHECK_AM_SHORT(_am_id, _length, _header_t, _max_inline) \
     UCT_CHECK_AM_ID(_am_id); \
     UCT_CHECK_LENGTH(sizeof(_header_t) + _length, 0, _max_inline, "am_short");

#define UCT_RC_CHECK_ZCOPY_DATA(_header_length, _length, _seg_size) \
    UCT_CHECK_LENGTH(_header_length + _length, 0, _seg_size, "am_zcopy payload"); \
    UCT_CHECK_LENGTH(_header_length + _length, 0, UCT_IB_MAX_MESSAGE_SIZE, "am_zcopy ib max message");

#define UCT_RC_CHECK_AM_ZCOPY(_id, _header_length, _length, _desc_size, _seg_size) \
    UCT_CHECK_AM_ID(_id); \
    UCT_RC_CHECK_ZCOPY_DATA(_header_length, _length, _seg_size) \
    UCT_CHECK_LENGTH(sizeof(uct_rc_hdr_t) + _header_length, 0, _desc_size, "am_zcopy header");


#define UCT_RC_IFACE_GET_TX_DESC(_iface, _mp, _desc) \
    UCT_TL_IFACE_GET_TX_DESC(&(_iface)->super.super, _mp, _desc, \
                             return UCS_ERR_NO_RESOURCE);

#define UCT_RC_IFACE_GET_TX_AM_BCOPY_DESC(_iface, _mp, _desc, _id, _pk_hdr_cb, \
                                          _hdr, _pack_cb, _arg, _length) ({ \
    _hdr *rch; \
    UCT_RC_IFACE_GET_TX_DESC(_iface, _mp, _desc) \
    (_desc)->super.handler = (uct_rc_send_handler_t)ucs_mpool_put; \
    rch = (_hdr *)(_desc + 1); \
    _pk_hdr_cb(rch, _id); \
    *(_length) = _pack_cb(rch + 1, _arg); \
})

#define UCT_RC_IFACE_GET_TX_AM_ZCOPY_DESC(_iface, _mp, _desc, \
                                          _id, _header, _header_length, _comp, _send_flags) \
    UCT_RC_IFACE_GET_TX_DESC(_iface, _mp, _desc); \
    uct_rc_zcopy_desc_set_comp(_desc, _comp, _send_flags); \
    uct_rc_zcopy_desc_set_header((uct_rc_hdr_t*)(_desc + 1), _id, _header, _header_length);

#define UCT_RC_IFACE_GET_TX_PUT_BCOPY_DESC(_iface, _mp, _desc, _pack_cb, _arg, _length) \
    UCT_RC_IFACE_GET_TX_DESC(_iface, _mp, _desc) \
    (_desc)->super.handler = (uct_rc_send_handler_t)ucs_mpool_put; \
    _length = _pack_cb(_desc + 1, _arg); \
    UCT_SKIP_ZERO_LENGTH(_length, _desc);

#define UCT_RC_IFACE_GET_TX_GET_BCOPY_DESC(_iface, _mp, _desc, _unpack_cb, _comp, _arg, _length) \
    UCT_RC_IFACE_GET_TX_DESC(_iface, _mp, _desc) \
    ucs_assert(_length <= (_iface)->super.config.seg_size); \
    _desc->super.handler     = (_comp == NULL) ? \
                                uct_rc_ep_get_bcopy_handler_no_completion : \
                                uct_rc_ep_get_bcopy_handler; \
    _desc->super.unpack_arg  = _arg; \
    _desc->super.user_comp   = _comp; \
    _desc->super.length      = _length; \
    _desc->unpack_cb         = _unpack_cb;


#define UCT_RC_IFACE_GET_TX_ATOMIC_DESC(_iface, _mp, _desc) \
    UCT_RC_IFACE_GET_TX_DESC(_iface, _mp, _desc) \
    _desc->super.handler = (uct_rc_send_handler_t)ucs_mpool_put;

#define UCT_RC_IFACE_GET_TX_ATOMIC_FETCH_DESC(_iface, _mp, _desc, _handler, _result, _comp) \
    UCT_CHECK_PARAM(_comp != NULL, "completion must be non-NULL"); \
    UCT_RC_IFACE_GET_TX_DESC(_iface, _mp, _desc) \
    _desc->super.handler   = _handler; \
    _desc->super.buffer    = _result; \
    _desc->super.user_comp = _comp;


enum {
    UCT_RC_IFACE_STAT_RX_COMPLETION,
    UCT_RC_IFACE_STAT_TX_COMPLETION,
    UCT_RC_IFACE_STAT_NO_CQE,
    UCT_RC_IFACE_STAT_NO_READS,
    UCT_RC_IFACE_STAT_LAST
};


/* flags for uct_rc_iface_send_op_t */
enum {
    UCT_RC_IFACE_SEND_OP_STATUS     = UCS_BIT(11), /* status field is valid */
#ifdef NVALGRIND
    UCT_RC_IFACE_SEND_OP_FLAG_IOV   = 0,
#else
    UCT_RC_IFACE_SEND_OP_FLAG_IOV   = UCS_BIT(12), /* save iovec to make mem defined */
#endif
#if UCS_ENABLE_ASSERT
    UCT_RC_IFACE_SEND_OP_FLAG_ZCOPY = UCS_BIT(13), /* zcopy */
    UCT_RC_IFACE_SEND_OP_FLAG_IFACE = UCS_BIT(14), /* belongs to iface ops buffer */
    UCT_RC_IFACE_SEND_OP_FLAG_INUSE = UCS_BIT(15)  /* queued on a txqp */
#else
    UCT_RC_IFACE_SEND_OP_FLAG_ZCOPY = 0,
    UCT_RC_IFACE_SEND_OP_FLAG_IFACE = 0,
    UCT_RC_IFACE_SEND_OP_FLAG_INUSE = 0
#endif
};


typedef void (*uct_rc_send_handler_t)(uct_rc_iface_send_op_t *op, const void *resp);


/**
 * RC network header.
 */
typedef struct uct_rc_hdr {
    uint8_t           am_id;     /* Active message ID */
} UCS_S_PACKED uct_rc_hdr_t;


typedef struct uct_rc_pending_req {
    uct_pending_req_t super;
    uct_ep_t          *ep;
} uct_rc_pending_req_t;


/**
 * RC fence type.
 */
typedef enum uct_rc_fence_mode {
    UCT_RC_FENCE_MODE_NONE,
    UCT_RC_FENCE_MODE_WEAK,
    UCT_RC_FENCE_MODE_AUTO,
    UCT_RC_FENCE_MODE_LAST
} uct_rc_fence_mode_t;


/* Common configuration used for rc verbs, rcx and dc transports */
typedef struct uct_rc_iface_common_config {
    uct_ib_iface_config_t    super;
    unsigned                 max_rd_atomic;
    int                      ooo_rw; /* Enable out-of-order RDMA data placement */
    int                      fence_mode;

    struct {
        double               timeout;
        unsigned             retry_count;
        double               rnr_timeout;
        unsigned             rnr_retry_count;
        size_t               max_get_zcopy;
        size_t               max_get_bytes;
        int                  poll_always;
    } tx;

    struct {
        int                  enable;
        double               hard_thresh;
        unsigned             wnd_size;
    } fc;
} uct_rc_iface_common_config_t;


/* RC specific configuration used for rc verbs and rcx transports only */
struct uct_rc_iface_config {
    uct_rc_iface_common_config_t   super;
    double                         soft_thresh;
    unsigned                       tx_cq_moderation; /* How many TX messages are
                                                        batched to one CQE */
    unsigned                       tx_cq_len;
    unsigned                       log_ack_req_freq; /* Log of requests ack
                                                        frequency on DevX */
};


/* QP TX cleanup context */
typedef struct {
    uct_ib_async_event_wait_t super;      /* LAST_WQE event callback */
    uct_rc_iface_t            *iface;     /* interface */
    ucs_list_link_t           list;       /* entry in interface ep_gc_list */
    uint32_t                  qp_num;     /* QP number to clean up */
    uint16_t                  cq_credits; /* how many CQ credits to release */
} uct_rc_iface_qp_cleanup_ctx_t;


typedef ucs_status_t
(*uct_rc_iface_init_rx_func_t)(uct_rc_iface_t *iface,
                               const uct_rc_iface_common_config_t *config);

typedef void (*uct_rc_iface_cleanup_rx_func_t)(uct_rc_iface_t *iface);

typedef ucs_status_t (*uct_rc_iface_fc_ctrl_func_t)(uct_ep_t *ep, unsigned op,
                                                    uct_rc_pending_req_t *req);

typedef ucs_status_t (*uct_rc_iface_fc_handler_func_t)(uct_rc_iface_t *iface,
                                                       unsigned qp_num,
                                                       uct_rc_hdr_t *hdr,
                                                       unsigned length,
                                                       uint32_t imm_data,
                                                       uint16_t lid,
                                                       unsigned flags);

typedef void (*uct_rc_iface_qp_cleanup_func_t)(
        uct_rc_iface_qp_cleanup_ctx_t *cleanup_ctx);


typedef void (*uct_rc_iface_ep_post_check_func_t)(uct_ep_h tl_ep);


typedef void (*uct_rc_iface_ep_vfs_populate_func_t)(uct_rc_ep_t *rc_ep);


typedef struct uct_rc_iface_ops {
    uct_ib_iface_ops_t                  super;
    uct_rc_iface_init_rx_func_t         init_rx;
    uct_rc_iface_cleanup_rx_func_t      cleanup_rx;
    uct_rc_iface_fc_ctrl_func_t         fc_ctrl;
    uct_rc_iface_fc_handler_func_t      fc_handler;
    uct_rc_iface_qp_cleanup_func_t      cleanup_qp;
    uct_rc_iface_ep_post_check_func_t   ep_post_check;
    uct_rc_iface_ep_vfs_populate_func_t ep_vfs_populate;
} uct_rc_iface_ops_t;


typedef struct uct_rc_srq {
    unsigned                 available;
    unsigned                 quota;
} uct_rc_srq_t;


struct uct_rc_iface {
    uct_ib_iface_t              super;

    struct {
        ucs_mpool_t             mp;         /* pool for send descriptors */
        ucs_mpool_t             pending_mp; /* pool for FC grant and keepalive
                                               pending requests */
        ucs_mpool_t             send_op_mp; /* pool for send_op completions */
        /* Credits for completions.
         * May be negative in case mlx5 because we take "num_bb" credits per
         * post to be able to calculate credits of outstanding ops on failure.
         * In case of verbs TL we use QWE number, so 1 post always takes 1
         * credit */
        signed                  cq_available;
        ssize_t                 reads_available;
        ssize_t                 reads_completed;
        uct_rc_iface_send_op_t  *free_ops; /* stack of free send operations */
        ucs_arbiter_t           arbiter;
        uct_rc_iface_send_op_t  *ops_buffer;
        uct_ib_fence_info_t     fi;
#if UCS_ENABLE_ASSERT
        int                     in_pending;
#endif
    } tx;

    struct {
        ucs_mpool_t          mp;
        uct_rc_srq_t         srq;
    } rx;

    struct {
        unsigned             tx_qp_len;
        unsigned             tx_min_sge;
        unsigned             tx_min_inline;
        unsigned             tx_ops_count;
        uint16_t             tx_moderation;
        uint8_t              tx_poll_always;

        /* Threshold to send "soft" FC credit request. The peer will try to
         * piggy-back credits grant to the counter AM, if any. */
        int16_t              fc_soft_thresh;

        /* Threshold to sent "hard" credits request. The peer will grant
         * credits in a separate AM as soon as it handles this request. */
        int16_t              fc_hard_thresh;

        uint16_t             fc_wnd_size;
        uint8_t              fc_enabled;

        uint8_t              min_rnr_timer;
        uint8_t              timeout;
        uint8_t              rnr_retry;
        uint8_t              retry_cnt;
        uint8_t              max_rd_atomic;
        /* Enable out-of-order RDMA data placement */
        uint8_t              ooo_rw;
#if UCS_ENABLE_ASSERT
        int                  tx_cq_len;
#endif
        uct_rc_fence_mode_t  fence_mode;
        unsigned             exp_backoff;
        size_t               max_get_zcopy;

        /* Atomic callbacks */
        uct_rc_send_handler_t  atomic64_handler;      /* 64bit ib-spec */
        uct_rc_send_handler_t  atomic32_ext_handler;  /* 32bit extended */
        uct_rc_send_handler_t  atomic64_ext_handler;  /* 64bit extended */
    } config;

    UCS_STATS_NODE_DECLARE(stats)

    uct_rc_ep_t              **eps[UCT_RC_QP_TABLE_SIZE];
    ucs_list_link_t          ep_list;
    ucs_list_link_t          qp_gc_list;

    /* Progress function (either regular or TM aware) */
    ucs_callback_t           progress;
};
UCS_CLASS_DECLARE(uct_rc_iface_t, uct_iface_ops_t*, uct_rc_iface_ops_t*,
                  uct_md_h, uct_worker_h, const uct_iface_params_t*,
                  const uct_rc_iface_common_config_t*,
                  const uct_ib_iface_init_attr_t*);


struct uct_rc_iface_send_op {
    union {
        ucs_queue_elem_t          queue;  /* used when enqueued on a txqp */
        uct_rc_iface_send_op_t    *next;  /* used when on free list */
        ucs_status_t              status; /* used when purging outstanding */
    };
    uct_rc_send_handler_t         handler;
    uint16_t                      sn;
    uint16_t                      flags;
    unsigned                      length;
    union {
        void                      *buffer;     /* atomics / desc /
                                                  FC_PURE_GRANT request */
        void                      *unpack_arg; /* get_bcopy / desc */
        uct_rc_iface_t            *iface;      /* should not be used with
                                                  get_bcopy completions */
        uct_ep_h                  ep;          /* ep on which we sent ep_check */
    };
    uct_completion_t              *user_comp;
#ifndef NVALGRIND
    struct iovec                  *iov;        /* get_zcopy with valgrind */
#endif

#if ENABLE_DEBUG_DATA
    const char                    *name;       /* object ID, debug only */
#endif
};


struct uct_rc_iface_send_desc {
    uct_rc_iface_send_op_t        super;
    uct_unpack_callback_t         unpack_cb;
    uint32_t                      lkey;
};


/*
 * Short active message header (active message header is always 64 bit).
 */
typedef struct uct_rc_am_short_hdr {
    uct_rc_hdr_t      rc_hdr;
    uint64_t          am_hdr;
} UCS_S_PACKED uct_rc_am_short_hdr_t;


extern ucs_config_field_t uct_rc_iface_config_table[];
extern ucs_config_field_t uct_rc_iface_common_config_table[];

unsigned uct_rc_iface_do_progress(uct_iface_h tl_iface);

ucs_status_t uct_rc_iface_query(uct_rc_iface_t *iface,
                                uct_iface_attr_t *iface_attr,
                                size_t put_max_short, size_t max_inline,
                                size_t am_max_hdr, size_t am_max_iov,
                                size_t am_min_hdr, size_t rma_max_iov);

void uct_rc_iface_add_qp(uct_rc_iface_t *iface, uct_rc_ep_t *ep,
                         unsigned qp_num);

void uct_rc_iface_remove_qp(uct_rc_iface_t *iface, unsigned qp_num);

ucs_status_t uct_rc_iface_flush(uct_iface_h tl_iface, unsigned flags,
                                uct_completion_t *comp);

void uct_rc_iface_send_desc_init(uct_iface_h tl_iface, void *obj, uct_mem_h memh);

void uct_rc_ep_am_zcopy_handler(uct_rc_iface_send_op_t *op, const void *resp);

void uct_rc_iface_cleanup_qps(uct_rc_iface_t *iface);

unsigned uct_rc_iface_qp_cleanup_progress(void *arg);


/**
 * Creates an RC or DCI QP
 */
ucs_status_t uct_rc_iface_qp_create(uct_rc_iface_t *iface, struct ibv_qp **qp_p,
                                    uct_ib_qp_attr_t *attr, unsigned max_send_wr,
                                    struct ibv_srq *srq);

void uct_rc_iface_fill_attr(uct_rc_iface_t *iface,
                            uct_ib_qp_attr_t *qp_init_attr,
                            unsigned max_send_wr,
                            struct ibv_srq *srq);

ucs_status_t uct_rc_iface_qp_init(uct_rc_iface_t *iface, struct ibv_qp *qp);

ucs_status_t uct_rc_iface_qp_connect(uct_rc_iface_t *iface, struct ibv_qp *qp,
                                     const uint32_t qp_num,
                                     struct ibv_ah_attr *ah_attr,
                                     enum ibv_mtu path_mtu);

ucs_status_t uct_rc_iface_fc_handler(uct_rc_iface_t *iface, unsigned qp_num,
                                     uct_rc_hdr_t *hdr, unsigned length,
                                     uint32_t imm_data, uint16_t lid, unsigned flags);

ucs_status_t uct_rc_init_fc_thresh(uct_rc_iface_config_t *rc_cfg,
                                   uct_rc_iface_t *iface);

ucs_status_t uct_rc_iface_event_arm(uct_iface_h tl_iface, unsigned events);

ucs_status_t uct_rc_iface_common_event_arm(uct_iface_h tl_iface,
                                           unsigned events, int force_rx_all);

ucs_status_t uct_rc_iface_init_rx(uct_rc_iface_t *iface,
                                  const uct_rc_iface_common_config_t *config,
                                  struct ibv_srq **p_srq);

ucs_status_t uct_rc_iface_fence(uct_iface_h tl_iface, unsigned flags);

void uct_rc_iface_vfs_populate(uct_rc_iface_t *iface);

void uct_rc_iface_vfs_refresh(uct_iface_h iface);

ucs_arbiter_cb_result_t
uct_rc_ep_process_pending(ucs_arbiter_t *arbiter, ucs_arbiter_group_t *group,
                          ucs_arbiter_elem_t *elem, void *arg);

static UCS_F_ALWAYS_INLINE ucs_status_t
uct_rc_fc_ctrl(uct_ep_t *ep, unsigned op, uct_rc_pending_req_t *req)
{
    uct_rc_iface_t *iface   = ucs_derived_of(ep->iface, uct_rc_iface_t);
    uct_rc_iface_ops_t *ops = ucs_derived_of(iface->super.ops,
                                             uct_rc_iface_ops_t);
    return ops->fc_ctrl(ep, op, req);
}

static inline uct_rc_ep_t *uct_rc_iface_lookup_ep(uct_rc_iface_t *iface,
                                                  unsigned qp_num)
{
    ucs_assert(qp_num < UCS_BIT(UCT_IB_QPN_ORDER));
    return iface->eps[qp_num >> UCT_RC_QP_TABLE_ORDER]
                     [qp_num &  UCS_MASK(UCT_RC_QP_TABLE_MEMB_ORDER)];
}


static UCS_F_ALWAYS_INLINE int
uct_rc_iface_have_tx_cqe_avail(uct_rc_iface_t* iface)
{
    return iface->tx.cq_available > 0;
}

/**
 * Release RDMA_READ credits back to RC iface.
 * RDMA_READ credits are freed in completion callbacks, but not released to
 * RC iface to avoid OOO sends. Otherwise, if read credit is the only missing
 * resource and is released in completion callback, next completion callback
 * will be able to send even if pending queue is not empty.
 */
static UCS_F_ALWAYS_INLINE void
uct_rc_iface_update_reads(uct_rc_iface_t *iface)
{
    ucs_assert(iface->tx.reads_completed >= 0);

    iface->tx.reads_available += iface->tx.reads_completed;
    iface->tx.reads_completed  = 0;
}

static UCS_F_ALWAYS_INLINE void
uct_rc_iface_add_cq_credits_dispatch(uct_rc_iface_t *iface, uint16_t cq_credits)
{
    iface->tx.cq_available += cq_credits;
    ucs_assertv(iface->tx.cq_available <= iface->config.tx_cq_len,
                "cq_available=%d tx_cq_len=%d cq_credits=%d",
                iface->tx.cq_available, iface->config.tx_cq_len, cq_credits);
    ucs_arbiter_dispatch(&iface->tx.arbiter, 1, uct_rc_ep_process_pending,
                         NULL);
}

static UCS_F_ALWAYS_INLINE uct_rc_iface_send_op_t*
uct_rc_iface_get_send_op(uct_rc_iface_t *iface)
{
    uct_rc_iface_send_op_t *op;
    op = iface->tx.free_ops;
    iface->tx.free_ops = op->next;
    return op;
}

static UCS_F_ALWAYS_INLINE void
uct_rc_iface_put_send_op(uct_rc_iface_send_op_t *op)
{
    uct_rc_iface_t *iface = op->iface;

    ucs_assert(op->flags & UCT_RC_IFACE_SEND_OP_FLAG_IFACE);

    op->next = iface->tx.free_ops;
    iface->tx.free_ops = op;
}

static UCS_F_ALWAYS_INLINE void
uct_rc_am_hdr_fill(uct_rc_hdr_t *rch, uint8_t id)
{
    rch->am_id = id;
}

static inline void uct_rc_zcopy_desc_set_comp(uct_rc_iface_send_desc_t *desc,
                                              uct_completion_t *comp,
                                              int *send_flags)
{
    if (comp == NULL) {
        desc->super.handler   = (uct_rc_send_handler_t)ucs_mpool_put;
        *send_flags           = 0;
    } else {
        desc->super.handler   = uct_rc_ep_am_zcopy_handler;
        desc->super.user_comp = comp;
        *send_flags           = IBV_SEND_SIGNALED;
    }
}

static inline void uct_rc_zcopy_desc_set_header(uct_rc_hdr_t *rch,
                                                uint8_t id, const void *header,
                                                unsigned header_length)
{
    uct_rc_am_hdr_fill(rch, id);
    memcpy(rch + 1, header, header_length);
}

static inline int uct_rc_iface_has_tx_resources(uct_rc_iface_t *iface)
{
    return uct_rc_iface_have_tx_cqe_avail(iface) &&
           !ucs_mpool_is_empty(&iface->tx.mp) &&
           (iface->tx.reads_available > 0);
}

static UCS_F_ALWAYS_INLINE uct_rc_send_handler_t
uct_rc_iface_atomic_handler(uct_rc_iface_t *iface, int ext, unsigned length)
{
    ucs_assert((length == sizeof(uint32_t)) || (length == sizeof(uint64_t)));
    switch (length) {
    case sizeof(uint32_t):
        return iface->config.atomic32_ext_handler;
    case sizeof(uint64_t):
        return ext ? iface->config.atomic64_ext_handler :
                     iface->config.atomic64_handler;
    }
    return NULL;
}

static UCS_F_ALWAYS_INLINE ucs_status_t
uct_rc_iface_fence_relaxed_order(uct_iface_h tl_iface)
{
    uct_base_iface_t *iface = ucs_derived_of(tl_iface, uct_base_iface_t);
    uct_ib_md_t *md         = ucs_derived_of(iface->md, uct_ib_md_t);

    ucs_assert(tl_iface->ops.iface_fence == uct_rc_iface_fence);

    if (!md->relaxed_order) {
        return UCS_OK;
    }

    return uct_rc_iface_fence(tl_iface, 0);
}

static UCS_F_ALWAYS_INLINE void
uct_rc_iface_check_pending(uct_rc_iface_t *iface, ucs_arbiter_group_t *arb_group)
{
    ucs_assert(iface->tx.in_pending || ucs_arbiter_group_is_empty(arb_group));
}

static UCS_F_ALWAYS_INLINE ucs_status_t
uct_rc_iface_invoke_pending_cb(uct_rc_iface_t *iface, uct_pending_req_t *req)
{
    ucs_status_t status;

    ucs_trace_data("progressing pending request %p", req);
#if UCS_ENABLE_ASSERT
    iface->tx.in_pending = 1;
#endif

    status = req->func(req);

#if UCS_ENABLE_ASSERT
    iface->tx.in_pending = 0;
#endif
    ucs_trace_data("status returned from progress pending: %s",
                   ucs_status_string(status));

    return status;
}

static UCS_F_ALWAYS_INLINE int
uct_rc_iface_poll_tx(uct_rc_iface_t *iface, unsigned count)
{
    return (count == 0) || iface->config.tx_poll_always;
}

static UCS_F_ALWAYS_INLINE void
uct_rc_iface_send_op_set_name(uct_rc_iface_send_op_t *op, const char *name)
{
#if ENABLE_DEBUG_DATA
    op->name = name;
#endif
}

#endif