async_ucx/
lib.rs

1//! Asynchronous Rust bindings to UCX.
2
3#![deny(warnings)]
4#![deny(missing_docs)]
5
6use ucx1_sys::ucs_status_ptr_t;
7use ucx1_sys::ucs_status_t;
8use ucx1_sys::UCS_PTR_IS_ERR;
9use ucx1_sys::UCS_PTR_RAW_STATUS;
10
11#[macro_use]
12extern crate log;
13
14#[cfg(test)]
15macro_rules! spawn_thread {
16    ($future:expr) => {
17        std::thread::spawn(move || {
18            let rt = tokio::runtime::Builder::new_current_thread()
19                .enable_time()
20                .build()
21                .unwrap();
22            let local = tokio::task::LocalSet::new();
23            local.block_on(&rt, $future);
24            println!("after block!");
25        })
26    };
27}
28
29pub mod ucp;
30
31/// UCX error code.
32#[allow(missing_docs)]
33#[repr(i8)]
34#[derive(thiserror::Error, Debug, PartialEq, Eq)]
35pub enum Error {
36    #[error("Operation in progress")]
37    Inprogress,
38    #[error("No pending message")]
39    NoMessage,
40    #[error("No resources are available to initiate the operation")]
41    NoReource,
42    #[error("Input/output error")]
43    IoError,
44    #[error("Out of memory")]
45    NoMemory,
46    #[error("Invalid parameter")]
47    InvalidParam,
48    #[error("Destination is unreachable")]
49    Unreachable,
50    #[error("Address not valid")]
51    InvalidAddr,
52    #[error("Function not implemented")]
53    NotImplemented,
54    #[error("Message truncated")]
55    MessageTruncated,
56    #[error("No progress")]
57    NoProgress,
58    #[error("Provided buffer is too small")]
59    BufferTooSmall,
60    #[error("No such element")]
61    NoElem,
62    #[error("Failed to connect some of the requested endpoints")]
63    SomeConnectsFailed,
64    #[error("No such device")]
65    NoDevice,
66    #[error("Device is busy")]
67    Busy,
68    #[error("Request canceled")]
69    Canceled,
70    #[error("Shared memory error")]
71    ShmemSegment,
72    #[error("Element already exists")]
73    AlreadyExists,
74    #[error("Index out of range")]
75    OutOfRange,
76    #[error("Operation timed out")]
77    Timeout,
78    #[error("User-defined limit was reached")]
79    ExceedsLimit,
80    #[error("Unsupported operation")]
81    Unsupported,
82    #[error("Operation rejected by remote peer")]
83    Rejected,
84    #[error("Endpoint is not connected")]
85    NotConnected,
86    #[error("Connection reset by remote peer")]
87    ConnectionReset,
88
89    #[error("First link failure")]
90    FirstLinkFailure,
91    #[error("Last link failure")]
92    LastLinkFailure,
93    #[error("First endpoint failure")]
94    FirstEndpointFailure,
95    #[error("Last endpoint failure")]
96    LastEndpointFailure,
97    #[error("Endpoint timeout")]
98    EndpointTimeout,
99
100    #[error("Unknown error")]
101    Unknown,
102}
103
104impl Error {
105    // status != UCS_OK
106    fn from_error(status: ucs_status_t) -> Self {
107        debug_assert_ne!(status, ucs_status_t::UCS_OK);
108
109        match status {
110            ucs_status_t::UCS_INPROGRESS => Self::Inprogress,
111            ucs_status_t::UCS_ERR_NO_MESSAGE => Self::NoMessage,
112            ucs_status_t::UCS_ERR_NO_RESOURCE => Self::NoReource,
113            ucs_status_t::UCS_ERR_IO_ERROR => Self::IoError,
114            ucs_status_t::UCS_ERR_NO_MEMORY => Self::NoMemory,
115            ucs_status_t::UCS_ERR_INVALID_PARAM => Self::InvalidParam,
116            ucs_status_t::UCS_ERR_UNREACHABLE => Self::Unreachable,
117            ucs_status_t::UCS_ERR_INVALID_ADDR => Self::InvalidAddr,
118            ucs_status_t::UCS_ERR_NOT_IMPLEMENTED => Self::NotImplemented,
119            ucs_status_t::UCS_ERR_MESSAGE_TRUNCATED => Self::MessageTruncated,
120            ucs_status_t::UCS_ERR_NO_PROGRESS => Self::NoProgress,
121            ucs_status_t::UCS_ERR_BUFFER_TOO_SMALL => Self::BufferTooSmall,
122            ucs_status_t::UCS_ERR_NO_ELEM => Self::NoElem,
123            ucs_status_t::UCS_ERR_SOME_CONNECTS_FAILED => Self::SomeConnectsFailed,
124            ucs_status_t::UCS_ERR_NO_DEVICE => Self::NoDevice,
125            ucs_status_t::UCS_ERR_BUSY => Self::Busy,
126            ucs_status_t::UCS_ERR_CANCELED => Self::Canceled,
127            ucs_status_t::UCS_ERR_SHMEM_SEGMENT => Self::ShmemSegment,
128            ucs_status_t::UCS_ERR_ALREADY_EXISTS => Self::AlreadyExists,
129            ucs_status_t::UCS_ERR_OUT_OF_RANGE => Self::OutOfRange,
130            ucs_status_t::UCS_ERR_TIMED_OUT => Self::Timeout,
131            ucs_status_t::UCS_ERR_EXCEEDS_LIMIT => Self::ExceedsLimit,
132            ucs_status_t::UCS_ERR_UNSUPPORTED => Self::Unsupported,
133            ucs_status_t::UCS_ERR_REJECTED => Self::Rejected,
134            ucs_status_t::UCS_ERR_NOT_CONNECTED => Self::NotConnected,
135            ucs_status_t::UCS_ERR_CONNECTION_RESET => Self::ConnectionReset,
136
137            ucs_status_t::UCS_ERR_FIRST_LINK_FAILURE => Self::FirstLinkFailure,
138            ucs_status_t::UCS_ERR_LAST_LINK_FAILURE => Self::LastLinkFailure,
139            ucs_status_t::UCS_ERR_FIRST_ENDPOINT_FAILURE => Self::FirstEndpointFailure,
140            ucs_status_t::UCS_ERR_ENDPOINT_TIMEOUT => Self::EndpointTimeout,
141            ucs_status_t::UCS_ERR_LAST_ENDPOINT_FAILURE => Self::LastEndpointFailure,
142
143            _ => Self::Unknown,
144        }
145    }
146
147    #[inline]
148    fn from_status(status: ucs_status_t) -> Result<(), Self> {
149        if status == ucs_status_t::UCS_OK {
150            Ok(())
151        } else {
152            Err(Self::from_error(status))
153        }
154    }
155
156    #[inline]
157    #[allow(dead_code)]
158    fn from_ptr(ptr: ucs_status_ptr_t) -> Result<(), Self> {
159        if UCS_PTR_IS_ERR(ptr) {
160            Err(Self::from_error(UCS_PTR_RAW_STATUS(ptr)))
161        } else {
162            Ok(())
163        }
164    }
165}