async_ucx/ucp/
mod.rs

1//! Unified Communication Protocol (UCP).
2
3use futures::task::AtomicWaker;
4use std::ffi::CString;
5use std::mem::MaybeUninit;
6use std::os::raw::c_void;
7use std::ptr::{null, null_mut};
8use std::rc::Rc;
9use std::sync::Arc;
10use ucx1_sys::*;
11
12mod endpoint;
13mod listener;
14mod worker;
15
16use crate::Error;
17
18pub use self::endpoint::*;
19pub use self::listener::*;
20pub use self::worker::*;
21
22/// The configuration for UCP application context.
23#[derive(Debug)]
24pub struct Config {
25    handle: *mut ucp_config_t,
26}
27
28impl Default for Config {
29    fn default() -> Self {
30        let mut handle = MaybeUninit::uninit();
31        let status = unsafe { ucp_config_read(null(), null(), handle.as_mut_ptr()) };
32        Error::from_status(status).unwrap();
33
34        Config {
35            handle: unsafe { handle.assume_init() },
36        }
37    }
38}
39
40impl Config {
41    /// Prints information about the context configuration.
42    ///
43    /// Including memory domains, transport resources, and other useful
44    /// information associated with the context.
45    pub fn print_to_stderr(&self) {
46        let flags = ucs_config_print_flags_t::UCS_CONFIG_PRINT_CONFIG
47            | ucs_config_print_flags_t::UCS_CONFIG_PRINT_DOC
48            | ucs_config_print_flags_t::UCS_CONFIG_PRINT_HEADER
49            | ucs_config_print_flags_t::UCS_CONFIG_PRINT_HIDDEN;
50        let title = CString::new("UCP Configuration").expect("Not a valid CStr");
51        unsafe { ucp_config_print(self.handle, stderr, title.as_ptr(), flags) };
52    }
53}
54
55impl Drop for Config {
56    fn drop(&mut self) {
57        unsafe { ucp_config_release(self.handle) };
58    }
59}
60
61/// An object that holds a UCP communication instance's global information.
62#[derive(Debug)]
63pub struct Context {
64    handle: ucp_context_h,
65}
66
67// Context is thread safe.
68unsafe impl Send for Context {}
69unsafe impl Sync for Context {}
70
71impl Context {
72    /// Creates and initializes a UCP application context with default configuration.
73    pub fn new() -> Result<Arc<Self>, Error> {
74        Self::new_with_config(&Config::default())
75    }
76
77    /// Creates and initializes a UCP application context with specified configuration.
78    pub fn new_with_config(config: &Config) -> Result<Arc<Self>, Error> {
79        let features = ucp_feature::UCP_FEATURE_RMA
80            | ucp_feature::UCP_FEATURE_TAG
81            | ucp_feature::UCP_FEATURE_STREAM
82            | ucp_feature::UCP_FEATURE_WAKEUP;
83        #[cfg(feature = "am")]
84        let features = features | ucp_feature::UCP_FEATURE_AM;
85
86        #[allow(clippy::uninit_assumed_init)]
87        let params = ucp_params_t {
88            field_mask: (ucp_params_field::UCP_PARAM_FIELD_FEATURES
89                | ucp_params_field::UCP_PARAM_FIELD_REQUEST_SIZE
90                | ucp_params_field::UCP_PARAM_FIELD_REQUEST_INIT
91                | ucp_params_field::UCP_PARAM_FIELD_REQUEST_CLEANUP
92                | ucp_params_field::UCP_PARAM_FIELD_MT_WORKERS_SHARED)
93                .0 as u64,
94            features: features.0 as u64,
95            request_size: std::mem::size_of::<Request>() as u64,
96            request_init: Some(Request::init),
97            request_cleanup: Some(Request::cleanup),
98            mt_workers_shared: 1,
99            ..unsafe { MaybeUninit::uninit().assume_init() }
100        };
101        let mut handle = MaybeUninit::uninit();
102        let status = unsafe {
103            ucp_init_version(
104                UCP_API_MAJOR,
105                UCP_API_MINOR,
106                &params,
107                config.handle,
108                handle.as_mut_ptr(),
109            )
110        };
111        Error::from_status(status)?;
112
113        Ok(Arc::new(Context {
114            handle: unsafe { handle.assume_init() },
115        }))
116    }
117
118    /// Create a `Worker` object.
119    pub fn create_worker(self: &Arc<Self>) -> Result<Rc<Worker>, Error> {
120        Worker::new(self)
121    }
122
123    /// Prints information about the context configuration.
124    ///
125    /// Including memory domains, transport resources, and
126    /// other useful information associated with the context.
127    pub fn print_to_stderr(&self) {
128        unsafe { ucp_context_print_info(self.handle, stderr) };
129    }
130
131    /// Fetches information about the context.
132    pub fn query(&self) -> Result<ucp_context_attr, Error> {
133        #[allow(invalid_value)]
134        #[allow(clippy::uninit_assumed_init)]
135        let mut attr = ucp_context_attr {
136            field_mask: (ucp_context_attr_field::UCP_ATTR_FIELD_REQUEST_SIZE
137                | ucp_context_attr_field::UCP_ATTR_FIELD_THREAD_MODE)
138                .0 as u64,
139            ..unsafe { MaybeUninit::uninit().assume_init() }
140        };
141        let status = unsafe { ucp_context_query(self.handle, &mut attr) };
142        Error::from_status(status)?;
143
144        Ok(attr)
145    }
146}
147
148impl Drop for Context {
149    fn drop(&mut self) {
150        unsafe { ucp_cleanup(self.handle) };
151    }
152}
153
154extern "C" {
155    static stderr: *mut FILE;
156}
157
158/// Our defined request structure stored at `ucs_status_ptr_t`.
159///
160/// To enable this, set the following fields in `ucp_params_t` when initializing
161/// UCP context:
162/// ```ignore
163/// ucp_params_t {
164///     request_size: std::mem::size_of::<Request>() as u64,
165///     request_init: Some(Request::init),
166///     request_cleanup: Some(Request::cleanup),
167/// }
168/// ```
169#[derive(Default)]
170struct Request {
171    waker: AtomicWaker,
172}
173
174impl Request {
175    /// Initialize request.
176    ///
177    /// This function will be called only on the very first time a request memory
178    /// is initialized, and may not be called again if a request is reused.
179    unsafe extern "C" fn init(request: *mut c_void) {
180        (request as *mut Self).write(Request::default());
181    }
182
183    /// Final cleanup of the memory associated with the request.
184    ///
185    /// This routine may not be called every time a request is released.
186    unsafe extern "C" fn cleanup(request: *mut c_void) {
187        std::ptr::drop_in_place(request as *mut Self)
188    }
189}