ucx1-sys 0.1.0

Rust FFI bindings to UCX.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
/**
 * Copyright (C) Mellanox Technologies Ltd. 2001-2020.  ALL RIGHTS RESERVED.
 * Copyright (C) Los Alamos National Security, LLC. 2019 ALL RIGHTS RESERVED.
 *
 * See file LICENSE for terms.
 */

#ifndef UCP_EP_H_
#define UCP_EP_H_

#include "ucp_types.h"

#include <ucp/proto/lane_type.h>
#include <ucp/proto/proto_select.h>
#include <ucp/wireup/ep_match.h>
#include <ucp/api/ucp.h>
#include <uct/api/uct.h>
#include <uct/api/v2/uct_v2.h>
#include <ucs/datastruct/queue.h>
#include <ucs/datastruct/ptr_map.h>
#include <ucs/datastruct/strided_alloc.h>
#include <ucs/debug/assert.h>
#include <ucs/stats/stats.h>


#define UCP_MAX_IOV                16UL


/* Endpoint flags type */
#if ENABLE_DEBUG_DATA || UCS_ENABLE_ASSERT
typedef uint32_t                   ucp_ep_flags_t;
#else
typedef uint16_t                   ucp_ep_flags_t;
#endif

#if UCS_ENABLE_ASSERT
#define UCP_EP_ASSERT_COUNTER_INC(_counter) \
    do { \
        ucs_assert(*(_counter) < UINT_MAX); \
        ++(*(_counter)); \
    } while (0)

#define UCP_EP_ASSERT_COUNTER_DEC(_counter) \
    do { \
        ucs_assert(*(_counter) > 0); \
        --(*(_counter)); \
    } while (0)
#else
#define UCP_EP_ASSERT_COUNTER_INC(_counter)
#define UCP_EP_ASSERT_COUNTER_DEC(_counter)
#endif


#define UCP_SA_DATA_HEADER_VERSION_SHIFT 5


/**
 * Endpoint flags
 */
enum {
    UCP_EP_FLAG_LOCAL_CONNECTED        = UCS_BIT(0), /* All local endpoints are connected,
                                                        for CM case - local address was packed,
                                                        UCT did not report errors during
                                                        connection establishment protocol
                                                        and disconnect not called yet */
    UCP_EP_FLAG_REMOTE_CONNECTED       = UCS_BIT(1), /* All remote endpoints are connected */
    UCP_EP_FLAG_CONNECT_REQ_QUEUED     = UCS_BIT(2), /* Connection request was queued */
    UCP_EP_FLAG_FAILED                 = UCS_BIT(3), /* EP is in failed state */
    UCP_EP_FLAG_USED                   = UCS_BIT(4), /* EP is in use by the user */
    UCP_EP_FLAG_STREAM_HAS_DATA        = UCS_BIT(5), /* EP has data in the ext.stream.match_q */
    UCP_EP_FLAG_ON_MATCH_CTX           = UCS_BIT(6), /* EP is on match queue */
    UCP_EP_FLAG_REMOTE_ID              = UCS_BIT(7), /* remote ID is valid */
    UCP_EP_FLAG_CONNECT_PRE_REQ_QUEUED = UCS_BIT(9), /* Pre-Connection request was queued */
    UCP_EP_FLAG_CLOSED                 = UCS_BIT(10),/* EP was closed */
    UCP_EP_FLAG_CLOSE_REQ_VALID        = UCS_BIT(11),/* close protocol is started and
                                                        close_req is valid */
    UCP_EP_FLAG_ERR_HANDLER_INVOKED    = UCS_BIT(12),/* error handler was called */
    UCP_EP_FLAG_INTERNAL               = UCS_BIT(13),/* the internal EP which holds
                                                        temporary wireup configuration or
                                                        mem-type EP */
    UCP_EP_FLAG_INDIRECT_ID            = UCS_BIT(14),/* protocols on this endpoint will send
                                                        indirect endpoint id instead of pointer,
                                                        can be replaced with looking at local ID */

    /* DEBUG bits */
    UCP_EP_FLAG_CONNECT_REQ_SENT       = UCS_BIT(16),/* DEBUG: Connection request was sent */
    UCP_EP_FLAG_CONNECT_REP_SENT       = UCS_BIT(17),/* DEBUG: Connection reply was sent */
    UCP_EP_FLAG_CONNECT_ACK_SENT       = UCS_BIT(18),/* DEBUG: Connection ACK was sent */
    UCP_EP_FLAG_CONNECT_REQ_IGNORED    = UCS_BIT(19),/* DEBUG: Connection request was ignored */
    UCP_EP_FLAG_CONNECT_PRE_REQ_SENT   = UCS_BIT(20),/* DEBUG: Connection pre-request was sent */
    UCP_EP_FLAG_FLUSH_STATE_VALID      = UCS_BIT(21),/* DEBUG: flush_state is valid */
    UCP_EP_FLAG_DISCONNECTED_CM_LANE   = UCS_BIT(22),/* DEBUG: CM lane was disconnected, i.e.
                                                        @uct_ep_disconnect was called for CM EP */
    UCP_EP_FLAG_CLIENT_CONNECT_CB      = UCS_BIT(23),/* DEBUG: Client connect callback invoked */
    UCP_EP_FLAG_SERVER_NOTIFY_CB       = UCS_BIT(24),/* DEBUG: Server notify callback invoked */
    UCP_EP_FLAG_DISCONNECT_CB_CALLED   = UCS_BIT(25) /* DEBUG: Got disconnect notification */
};


/**
 * UCP endpoint statistics counters
 */
enum {
    UCP_EP_STAT_TAG_TX_EAGER,
    UCP_EP_STAT_TAG_TX_EAGER_SYNC,
    UCP_EP_STAT_TAG_TX_RNDV,
    UCP_EP_STAT_LAST
};


/**
 * Endpoint init flags
 */
enum {
    UCP_EP_INIT_FLAG_MEM_TYPE          = UCS_BIT(0),  /**< Endpoint for local mem type transfers */
    UCP_EP_INIT_CREATE_AM_LANE         = UCS_BIT(1),  /**< Endpoint requires an AM lane */
    UCP_EP_INIT_CM_WIREUP_CLIENT       = UCS_BIT(2),  /**< Endpoint wireup protocol is based on CM,
                                                           client side */
    UCP_EP_INIT_CM_WIREUP_SERVER       = UCS_BIT(3),  /**< Endpoint wireup protocol is based on CM,
                                                           server side */
    UCP_EP_INIT_ERR_MODE_PEER_FAILURE  = UCS_BIT(4),  /**< Endpoint requires an
                                                           @ref UCP_ERR_HANDLING_MODE_PEER */
    UCP_EP_INIT_CM_PHASE               = UCS_BIT(5),  /**< Endpoint connection to a peer is on
                                                           CM phase */
    UCP_EP_INIT_FLAG_INTERNAL          = UCS_BIT(6),  /**< Endpoint for internal usage
                                                           (e.g. memtype, reply on keepalive) */
    UCP_EP_INIT_CONNECT_TO_IFACE_ONLY  = UCS_BIT(7),  /**< Select transports which
                                                           support CONNECT_TO_IFACE
                                                           mode only */
    UCP_EP_INIT_CREATE_AM_LANE_ONLY    = UCS_BIT(8)   /**< Endpoint requires an AM lane only */
};


#define UCP_EP_STAT_TAG_OP(_ep, _op) \
    UCS_STATS_UPDATE_COUNTER((_ep)->stats, UCP_EP_STAT_TAG_TX_##_op, 1);


typedef struct ucp_ep_config_key_lane {
    ucp_rsc_index_t      rsc_index; /* Resource index */
    ucp_md_index_t       dst_md_index; /* Destination memory domain index */
    ucs_sys_device_t     dst_sys_dev; /* Destination system device */
    uint8_t              path_index; /* Device path index */
    ucp_lane_type_mask_t lane_types; /* Which types of operations this lane
                                        was selected for */
    size_t               seg_size; /* Maximal fragment size which can be
                                      received by the peer */
} ucp_ep_config_key_lane_t;


/*
 * Endpoint configuration key.
 * This is filled by to the transport selection logic, according to the local
 * resources and set of remote addresses.
 */
struct ucp_ep_config_key {

    ucp_lane_index_t         num_lanes;       /* Number of active lanes */
    ucp_ep_config_key_lane_t lanes[UCP_MAX_LANES]; /* Active lanes */

    ucp_lane_index_t         am_lane;         /* Lane for AM (can be NULL) */
    ucp_lane_index_t         tag_lane;        /* Lane for tag matching offload (can be NULL) */
    ucp_lane_index_t         wireup_msg_lane; /* Lane for wireup messages (can be NULL) */
    ucp_lane_index_t         cm_lane;         /* Lane for holding a CM connection (can be NULL) */

    /* Lanes for remote memory access, sorted by priority, highest first */
    ucp_lane_index_t         rma_lanes[UCP_MAX_LANES];

    /* Lanes for high-bw memory access, sorted by priority, highest first */
    ucp_lane_index_t         rma_bw_lanes[UCP_MAX_LANES];

    /* Lane for obtaining remote memory pointer */
    ucp_lane_index_t         rkey_ptr_lane;

    /* Lanes for atomic operations, sorted by priority, highest first */
    ucp_lane_index_t         amo_lanes[UCP_MAX_LANES];

    /* Lanes for high-bw active messages, sorted by priority, highest first */
    ucp_lane_index_t         am_bw_lanes[UCP_MAX_LANES];

    /* Local memory domains to send remote keys for in high-bw rma protocols
     * NOTE: potentially it can be different than what is imposed by rma_bw_lanes,
     * since these are the MDs used by remote side for accessing our memory. */
    ucp_md_map_t             rma_bw_md_map;

    /* Bitmap of remote mds which are reachable from this endpoint (with any set
     * of transports which could be selected in the future).
     */
    ucp_md_map_t             reachable_md_map;

    /* Array with popcount(reachable_md_map) elements, each entry holds the local
     * component index to be used for unpacking remote key from each set bit in
     * reachable_md_map */
    ucp_rsc_index_t          *dst_md_cmpts;

    /* Bitmap of lanes to ep_check keepalive operations. */
    ucp_lane_map_t           ep_check_map;

    /* Error handling mode */
    ucp_err_handling_mode_t  err_mode;
};


/*
 * Configuration for RMA protocols
 */
typedef struct ucp_ep_rma_config {
    ssize_t                max_put_short;    /* Maximal payload of put short */
    size_t                 max_put_bcopy;    /* Maximal total size of put_bcopy */
    size_t                 max_put_zcopy;
    ssize_t                max_get_short;    /* Maximal payload of get short */
    size_t                 max_get_bcopy;    /* Maximal total size of get_bcopy */
    size_t                 max_get_zcopy;
    size_t                 put_zcopy_thresh;
    size_t                 get_zcopy_thresh;
} ucp_ep_rma_config_t;


/*
 * Configuration for AM and tag offload protocols
 */
typedef struct ucp_ep_msg_config {
        ssize_t            max_short;
        size_t             max_bcopy;
        size_t             max_zcopy;
        size_t             max_hdr;
        size_t             max_iov;

        /* zero-copy threshold for operations which do not have to wait for remote side */
        size_t             zcopy_thresh[UCP_MAX_IOV];

        /* zero-copy threshold for mem type buffers */
        size_t             mem_type_zcopy_thresh[UCS_MEMORY_TYPE_LAST];

        /* zero-copy threshold for operations which anyways have to wait for remote side */
        size_t             sync_zcopy_thresh[UCP_MAX_IOV];
        uint8_t            zcopy_auto_thresh; /* if != 0 the zcopy enabled */
} ucp_ep_msg_config_t;


/*
 * Thresholds with and without non-host memory
 */
typedef struct ucp_memtype_thresh {
        ssize_t            memtype_on;
        ssize_t            memtype_off;
} ucp_memtype_thresh_t;


/*
 * Rendezvous thresholds
 */
typedef struct ucp_rndv_thresh {
    /* threshold calculated assuming faster remote completion */
    size_t            remote;
    /* threshold calculated assuming faster local completion, for instance
     * when UCP_OP_ATTR_FLAG_FAST_CMP flag is provided to send operation
     * parameters */
    size_t            local;
} ucp_rndv_thresh_t;


/*
 * Rendezvous Zcopy configuration
 */
typedef struct ucp_rndv_zcopy {
    /* Maximal total size of Zcopy operation */
    size_t           max;
    /* Minimal size of Zcopy operation */
    size_t           min;
    /* Can the message which are > maximal size be split to the segments which are
     * >= minimal size */
    int              split;
    /* Lanes for Zcopy operation */
    ucp_lane_index_t lanes[UCP_MAX_LANES];
    /* BW based scale factor for zcopy lanes */
    double           scale[UCP_MAX_LANES];
} ucp_ep_rndv_zcopy_config_t;


struct ucp_ep_config {

    /* A key which uniquely defines the configuration, and all other fields of
     * configuration (in the current worker) and defined only by it.
     */
    ucp_ep_config_key_t     key;

    /* Bitmap of which lanes are p2p; affects the behavior of connection
     * establishment protocols.
     */
    ucp_lane_map_t          p2p_lanes;

    /* Configuration for each lane that provides RMA */
    ucp_ep_rma_config_t     rma[UCP_MAX_LANES];

    /* Threshold for switching from put_short to put_bcopy */
    size_t                  bcopy_thresh;

    /* Configuration for AM lane */
    ucp_ep_msg_config_t     am;

    /* MD index of each lane */
    ucp_md_index_t          md_index[UCP_MAX_LANES];

    struct {
        /* RNDV GET Zcopy configuration */
        ucp_ep_rndv_zcopy_config_t get_zcopy;
        /* RNDV PUT Zcopy configuration */
        ucp_ep_rndv_zcopy_config_t put_zcopy;
        /* Threshold for switching from eager to RMA based rendezvous */
        ucp_rndv_thresh_t          rma_thresh;
        /* Threshold for switching from eager to AM based rendezvous */
        ucp_rndv_thresh_t          am_thresh;
        /* Total size of packed rkey, according to high-bw md_map */
        size_t                     rkey_size;
        /* Remote memory domains which support rkey_ptr */
        ucp_md_map_t               rkey_ptr_dst_mds;
    } rndv;

    struct {
        /* Protocols used for tag matching operations
         * (can be AM based or tag offload). */
        const ucp_request_send_proto_t   *proto;
        const ucp_request_send_proto_t   *sync_proto;

        /* Lane used for tag matching operations. */
        ucp_lane_index_t     lane;

        /* Maximal size for eager short. */
        ucp_memtype_thresh_t max_eager_short;

        /* Configuration of the lane used for eager protocols
         * (can be AM or tag offload). */
        ucp_ep_msg_config_t  eager;

        /* Threshold for switching from eager to rendezvous. Can be different
         * from AM thresholds if tag offload is enabled and tag offload lane is
         * not the same as AM lane. */
        struct {
            ucp_rndv_thresh_t    rma_thresh;
            ucp_rndv_thresh_t    am_thresh;
        } rndv;

        struct {
            /* Maximal size for eager short. */
            ucp_memtype_thresh_t max_eager_short;

            /* Maximal iov count for RNDV offload */
            size_t          max_rndv_iov;
            /* Maximal total size for RNDV offload */
            size_t          max_rndv_zcopy;
        } offload;
    } tag;

    struct {
        /* Protocols used for stream operations
         * (currently it's only AM based). */
        const ucp_request_send_proto_t   *proto;
    } stream;

    struct {
        /* Protocols used for am operations */
        const ucp_request_send_proto_t   *proto;
        const ucp_request_send_proto_t   *reply_proto;

        /* Maximal size for eager short */
        ucp_memtype_thresh_t             max_eager_short;

        /* Maximal size for eager short with reply protocol */
        ucp_memtype_thresh_t             max_reply_eager_short;
    } am_u;

    /* Protocol selection data */
    ucp_proto_select_t            proto_select;
};


/**
 * Protocol layer endpoint, represents a connection to a remote worker
 */
typedef struct ucp_ep {
    ucp_worker_h                  worker;        /* Worker this endpoint belongs to */

    uint8_t                       refcount;      /* Reference counter: 0 - it is
                                                    allowed to destroy EP */
    ucp_worker_cfg_index_t        cfg_index;     /* Configuration index */
    ucp_ep_match_conn_sn_t        conn_sn;       /* Sequence number for remote connection */
    ucp_lane_index_t              am_lane;       /* Cached value */
    ucp_ep_flags_t                flags;         /* Endpoint flags */

    /* TODO allocate ep dynamically according to number of lanes */
    uct_ep_h                      uct_eps[UCP_MAX_LANES]; /* Transports for every lane */

#if ENABLE_DEBUG_DATA
    char                          peer_name[UCP_WORKER_ADDRESS_NAME_MAX];
    /* Endpoint name for tracing and analysis */
    char                          name[UCP_ENTITY_NAME_MAX];
#endif

#if UCS_ENABLE_ASSERT
    /* How many Worker flush operations are in-progress where the EP is the next
     * EP for flushing */
    unsigned                      flush_iter_refcount;
    /* How many UCT EP discarding operations are in-progress scheduled for the
     * EP */
    unsigned                      discard_refcount;
#endif

    UCS_STATS_NODE_DECLARE(stats)

} ucp_ep_t;


/**
 * Status of protocol-level remote completions
 */
typedef struct {
    ucs_hlist_head_t reqs; /* Queue of flush requests which
                              are waiting for remote completion */
    uint32_t         send_sn; /* Sequence number of sent operations */
    uint32_t         cmpl_sn; /* Sequence number of completions */
} ucp_ep_flush_state_t;


/**
 * Status of protocol-level remote completions
 */
typedef struct {
    ucp_request_t             *req;             /* Flush request which is
                                                   used in close protocol */
} ucp_ep_close_proto_req_t;


/**
 * Endpoint extension for control data path
 */
typedef struct {
    ucp_rsc_index_t          cm_idx; /* CM index */
    ucs_ptr_map_key_t        local_ep_id; /* Local EP ID */
    ucs_ptr_map_key_t        remote_ep_id; /* Remote EP ID */
    ucp_err_handler_cb_t     err_cb; /* Error handler */
    ucp_ep_close_proto_req_t close_req; /* Close protocol request */
#if UCS_ENABLE_ASSERT
    ucs_time_t               ka_last_round; /* Time of last KA round done */
#endif
} ucp_ep_ext_control_t;


/**
 * Endpoint extension for generic non fast-path data
 */
typedef struct {
    void                          *user_data;    /* User data associated with ep */
    ucs_list_link_t               ep_list;       /* List entry in worker's all eps list */
    /* Endpoint match context and remote completion status are mutually exclusive,
     * since remote completions are counted only after the endpoint is already
     * matched to a remote peer.
     */
    union {
        ucp_ep_match_elem_t       ep_match;      /* Matching with remote endpoints */
        ucp_ep_flush_state_t      flush_state;   /* Remote completion status */
    };
    ucp_ep_ext_control_t          *control_ext;  /* Control data path extension */
    /* List of requests which are waiting for remote completion */
    ucs_hlist_head_t              proto_reqs;
} ucp_ep_ext_gen_t;


/**
 * Endpoint extension for specific protocols
 */
typedef struct {
    struct {
        ucs_list_link_t           ready_list;    /* List entry in worker's EP list */
        ucs_queue_head_t          match_q;       /* Queue of receive data or requests,
                                                    depends on UCP_EP_FLAG_STREAM_HAS_DATA */
    } stream;

    struct {
        ucs_list_link_t           started_ams;
        ucs_queue_head_t          mid_rdesc_q; /* queue of middle fragments, which
                                                  arrived before the first one */
    } am;
} ucp_ep_ext_proto_t;


enum {
    UCP_WIREUP_SA_DATA_CM_ADDR   = UCS_BIT(1)  /* Sockaddr client data contains address
                                                  for CM based wireup: there is only
                                                  iface and ep address of transport
                                                  lanes, remote device address is
                                                  provided by CM and has to be added to
                                                  unpacked UCP address locally. */
};


/* Sockaddr data flags that are packed to the header field in
 * ucp_wireup_sockaddr_data_base_t structure.
 */
enum {
    /* Indicates support of UCP_ERR_HANDLING_MODE_PEER error mode. */
    UCP_SA_DATA_FLAG_ERR_MODE_PEER = UCS_BIT(0)
};


/* Basic sockaddr data. Version 1 uses some additional fields which are not
 * really needed and removed in version 2.
 */
typedef struct ucp_wireup_sockaddr_data_base {
    uint64_t                  ep_id; /**< Endpoint ID */

    /* This field has different meaning for sa_data v1 and other versions:
     * v1:           it is error handling mode
     * v2 and newer: it is sa_data header with the following format:
     *   +---+-----+
     *   | 3 |  5  |
     *   +---+-----+
     *     v    |
     * version  |
     *          v
     *        flags
     *
     * It is safe to keep version in 3 MSB, because it will always be zeros
     * (i.e. UCP_OBJECT_VERSION_V1) in sa_data v1 (err_mode value is small).
     */
    uint8_t                   header;
    /* packed worker address (or sa_data v1) follows */
} UCS_S_PACKED ucp_wireup_sockaddr_data_base_t;


typedef struct ucp_wireup_sockaddr_data_v1 {
    ucp_wireup_sockaddr_data_base_t super;
    uint8_t                         addr_mode; /**< The attached address format
                                                    defined by
                                                    UCP_WIREUP_SA_DATA_xx */
    uint8_t                         dev_index; /**< Device address index used to
                                                    build remote address in
                                                    UCP_WIREUP_SA_DATA_CM_ADDR
                                                    mode */
    /* packed worker address follows */
} UCS_S_PACKED ucp_wireup_sockaddr_data_v1_t;


typedef struct ucp_conn_request {
    ucp_listener_h              listener;
    uct_listener_h              uct_listener;
    uct_conn_request_h          uct_req;
    ucp_rsc_index_t             cm_idx;
    char                        dev_name[UCT_DEVICE_NAME_MAX];
    uct_device_addr_t           *remote_dev_addr;
    struct sockaddr_storage     client_address;
    ucp_ep_h                    ep; /* valid only if request is handled internally */
    /* sa_data and packed worker address follow */
} ucp_conn_request_t;


/**
 * Argument for discarding UCP endpoint's lanes
 */
typedef struct ucp_ep_discard_lanes_arg {
    unsigned     counter; /* How many discarding operations on UCT lanes are
                           * in-progress if purging of the UCP endpoint is
                           * required */
    ucs_status_t status; /* Completion status of operations after discarding is
                          * done */
    ucp_ep_h     ucp_ep; /* UCP endpoint which should be discarded */
} ucp_ep_discard_lanes_arg_t;


int ucp_is_uct_ep_failed(uct_ep_h uct_ep);

void ucp_ep_config_key_reset(ucp_ep_config_key_t *key);

void ucp_ep_config_cm_lane_info_str(ucp_worker_h worker,
                                    const ucp_ep_config_key_t *key,
                                    ucp_lane_index_t lane,
                                    ucp_rsc_index_t cm_index,
                                    ucs_string_buffer_t *buf);

void ucp_ep_config_lane_info_str(ucp_worker_h worker,
                                 const ucp_ep_config_key_t *key,
                                 const unsigned *addr_indices,
                                 ucp_lane_index_t lane,
                                 ucp_rsc_index_t aux_rsc_index,
                                 ucs_string_buffer_t *buf);

ucs_status_t ucp_ep_create_base(ucp_worker_h worker, const char *peer_name,
                                const char *message, ucp_ep_h *ep_p);

void ucp_ep_add_ref(ucp_ep_h ep);

int ucp_ep_remove_ref(ucp_ep_h ep);

ucs_status_t ucp_worker_create_ep(ucp_worker_h worker, unsigned ep_init_flags,
                                  const char *peer_name, const char *message,
                                  ucp_ep_h *ep_p);

void ucp_ep_delete(ucp_ep_h ep);

void ucp_ep_release_id(ucp_ep_h ep);

ucs_status_t ucp_ep_init_create_wireup(ucp_ep_h ep, unsigned ep_init_flags,
                                       ucp_wireup_ep_t **wireup_ep);

ucs_status_t
ucp_ep_create_to_worker_addr(ucp_worker_h worker,
                             const ucp_tl_bitmap_t *local_tl_bitmap,
                             const ucp_unpacked_address_t *remote_address,
                             unsigned ep_init_flags, const char *message,
                             ucp_ep_h *ep_p);

ucs_status_t ucp_ep_create_server_accept(ucp_worker_h worker,
                                         const ucp_conn_request_h conn_request,
                                         ucp_ep_h *ep_p);

ucs_status_ptr_t ucp_ep_flush_internal(ucp_ep_h ep, unsigned req_flags,
                                       const ucp_request_param_t *param,
                                       ucp_request_t *worker_req,
                                       ucp_request_callback_t flushed_cb,
                                       const char *debug_name);

ucs_status_t
ucp_ep_create_sockaddr_aux(ucp_worker_h worker, unsigned ep_init_flags,
                           const ucp_unpacked_address_t *remote_address,
                           ucp_ep_h *ep_p);

void ucp_ep_config_key_set_err_mode(ucp_ep_config_key_t *key,
                                    unsigned ep_init_flags);

void ucp_ep_err_pending_purge(uct_pending_req_t *self, void *arg);

void ucp_destroyed_ep_pending_purge(uct_pending_req_t *self, void *arg);

void ucp_ep_disconnected(ucp_ep_h ep, int force);

void ucp_ep_destroy_internal(ucp_ep_h ep);

ucs_status_t
ucp_ep_set_failed(ucp_ep_h ucp_ep, ucp_lane_index_t lane, ucs_status_t status);

void ucp_ep_set_failed_schedule(ucp_ep_h ucp_ep, ucp_lane_index_t lane,
                                ucs_status_t status);

void ucp_ep_unprogress_uct_ep(ucp_ep_h ep, uct_ep_h uct_ep,
                              ucp_rsc_index_t rsc_index);

void ucp_ep_cleanup_lanes(ucp_ep_h ep);

ucs_status_t ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config,
                                const ucp_ep_config_key_t *key);

void ucp_ep_config_cleanup(ucp_worker_h worker, ucp_ep_config_t *config);

int ucp_ep_config_lane_is_peer_match(const ucp_ep_config_key_t *key1,
                                     ucp_lane_index_t lane1,
                                     const ucp_ep_config_key_t *key2,
                                     ucp_lane_index_t lane2);

void ucp_ep_config_lanes_intersect(const ucp_ep_config_key_t *key1,
                                   const ucp_rsc_index_t *dst_rsc_indices1,
                                   const ucp_ep_config_key_t *key2,
                                   const ucp_rsc_index_t *dst_rsc_indices2,
                                   ucp_lane_index_t *lane_map);

int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1,
                           const ucp_ep_config_key_t *key2);

int ucp_ep_config_get_multi_lane_prio(const ucp_lane_index_t *lanes,
                                      ucp_lane_index_t lane);

size_t ucp_ep_config_get_zcopy_auto_thresh(size_t iovcnt,
                                           const ucs_linear_func_t *reg_cost,
                                           const ucp_context_h context,
                                           double bandwidth);

ucs_status_t ucp_worker_mem_type_eps_create(ucp_worker_h worker);

void ucp_worker_mem_type_eps_destroy(ucp_worker_h worker);

void ucp_worker_mem_type_eps_print_info(ucp_worker_h worker,
                                              FILE *stream);

ucp_wireup_ep_t * ucp_ep_get_cm_wireup_ep(ucp_ep_h ep);

void ucp_ep_get_tl_bitmap(ucp_ep_h ep, ucp_tl_bitmap_t *tl_bitmap);

uct_ep_h ucp_ep_get_cm_uct_ep(ucp_ep_h ep);

int ucp_ep_is_cm_local_connected(ucp_ep_h ep);

int ucp_ep_is_local_connected(ucp_ep_h ep);

unsigned ucp_ep_local_disconnect_progress(void *arg);

size_t ucp_ep_tag_offload_min_rndv_thresh(ucp_ep_config_t *config);

void ucp_ep_config_rndv_zcopy_commit(ucp_lane_index_t lanes_count,
                                     ucp_ep_rndv_zcopy_config_t *rndv_zcopy);

void ucp_ep_get_lane_info_str(ucp_ep_h ucp_ep, ucp_lane_index_t lane,
                              ucs_string_buffer_t *lane_info_strb);

void ucp_ep_config_rndv_zcopy_commit(ucp_lane_index_t lanes_count,
                                     ucp_ep_rndv_zcopy_config_t *rndv_zcopy);

void ucp_ep_invoke_err_cb(ucp_ep_h ep, ucs_status_t status);

ucs_status_t ucp_ep_flush_progress_pending(uct_pending_req_t *self);

void ucp_ep_flush_completion(uct_completion_t *self);

void ucp_ep_flush_request_ff(ucp_request_t *req, ucs_status_t status);

void
ucp_ep_purge_lanes(ucp_ep_h ep, uct_pending_purge_callback_t purge_cb,
                   void *purge_arg);

unsigned ucp_ep_discard_lanes(ucp_ep_h ep, ucs_status_t discard_status,
                              ucp_send_nbx_callback_t discard_cb,
                              ucp_ep_discard_lanes_arg_t *discard_arg);

void ucp_ep_register_disconnect_progress(ucp_request_t *req);

ucp_lane_index_t ucp_ep_lookup_lane(ucp_ep_h ucp_ep, uct_ep_h uct_ep);

/**
 * @brief Do keepalive operation for a specific UCT EP.
 *
 * @param [in] ucp_ep  UCP Endpoint object to operate keepalive.
 * @param [in] uct_ep  UCT Endpoint object to do keepalive on.
 * @param [in] rsc_idx Resource index to check.
 * @param [in] flags   Flags for keepalive operation.
 * @param [in] comp    Pointer to keepalive completion object.
 *
 * @return Status of keepalive operation.
 */
ucs_status_t ucp_ep_do_uct_ep_keepalive(ucp_ep_h ucp_ep, uct_ep_h uct_ep,
                                        ucp_rsc_index_t rsc_idx, unsigned flags,
                                        uct_completion_t *comp);

/**
 * @brief Do keepalive operation.
 *
 * @param [in] ep    UCP Endpoint object to operate keepalive.
 * @param [in] now   Current time when keepalive started.
 *
 * @return Indication whether keepalive was fully done for UCP Endpoint or not.
 */
int ucp_ep_do_keepalive(ucp_ep_h ep, ucs_time_t now);


/**
 * @brief Purge the protocol request scheduled on a given UCP endpoint.
 *
 * @param [in]     ucp_ep           Endpoint object on which the request should
 *                                  be purged.
 * @param [in]     req              The request to purge.
 * @param [in]     status           Completion status.
 * @param [in]     recursive        Indicates if the function was called from
 *                                  the @ref ucp_ep_req_purge recursively.
 */
void ucp_ep_req_purge(ucp_ep_h ucp_ep, ucp_request_t *req,
                      ucs_status_t status, int recursive);


/**
 * @brief Purge flush and protocol requests scheduled on a given UCP endpoint.
 *
 * @param [in]     ucp_ep           Endpoint object on which requests should be
 *                                  purged.
 * @param [in]     status           Completion status.
 */
void ucp_ep_reqs_purge(ucp_ep_h ucp_ep, ucs_status_t status);


/**
 * @brief Create objects in VFS to represent endpoint and its features.
 *
 * @param [in] ep Endpoint object to be described.
 */
void ucp_ep_vfs_init(ucp_ep_h ep);

#endif