1use futures::task::AtomicWaker;
4use std::ffi::CString;
5use std::mem::MaybeUninit;
6use std::os::raw::c_void;
7use std::ptr::{null, null_mut};
8use std::rc::Rc;
9use std::sync::Arc;
10use ucx1_sys::*;
11
12mod endpoint;
13mod listener;
14mod worker;
15
16use crate::Error;
17
18pub use self::endpoint::*;
19pub use self::listener::*;
20pub use self::worker::*;
21
22#[derive(Debug)]
24pub struct Config {
25 handle: *mut ucp_config_t,
26}
27
28impl Default for Config {
29 fn default() -> Self {
30 let mut handle = MaybeUninit::uninit();
31 let status = unsafe { ucp_config_read(null(), null(), handle.as_mut_ptr()) };
32 Error::from_status(status).unwrap();
33
34 Config {
35 handle: unsafe { handle.assume_init() },
36 }
37 }
38}
39
40impl Config {
41 pub fn print_to_stderr(&self) {
46 let flags = ucs_config_print_flags_t::UCS_CONFIG_PRINT_CONFIG
47 | ucs_config_print_flags_t::UCS_CONFIG_PRINT_DOC
48 | ucs_config_print_flags_t::UCS_CONFIG_PRINT_HEADER
49 | ucs_config_print_flags_t::UCS_CONFIG_PRINT_HIDDEN;
50 let title = CString::new("UCP Configuration").expect("Not a valid CStr");
51 unsafe { ucp_config_print(self.handle, stderr, title.as_ptr(), flags) };
52 }
53}
54
55impl Drop for Config {
56 fn drop(&mut self) {
57 unsafe { ucp_config_release(self.handle) };
58 }
59}
60
61#[derive(Debug)]
63pub struct Context {
64 handle: ucp_context_h,
65}
66
67unsafe impl Send for Context {}
69unsafe impl Sync for Context {}
70
71impl Context {
72 pub fn new() -> Result<Arc<Self>, Error> {
74 Self::new_with_config(&Config::default())
75 }
76
77 pub fn new_with_config(config: &Config) -> Result<Arc<Self>, Error> {
79 let features = ucp_feature::UCP_FEATURE_RMA
80 | ucp_feature::UCP_FEATURE_TAG
81 | ucp_feature::UCP_FEATURE_STREAM
82 | ucp_feature::UCP_FEATURE_WAKEUP;
83 #[cfg(feature = "am")]
84 let features = features | ucp_feature::UCP_FEATURE_AM;
85
86 #[allow(clippy::uninit_assumed_init)]
87 let params = ucp_params_t {
88 field_mask: (ucp_params_field::UCP_PARAM_FIELD_FEATURES
89 | ucp_params_field::UCP_PARAM_FIELD_REQUEST_SIZE
90 | ucp_params_field::UCP_PARAM_FIELD_REQUEST_INIT
91 | ucp_params_field::UCP_PARAM_FIELD_REQUEST_CLEANUP
92 | ucp_params_field::UCP_PARAM_FIELD_MT_WORKERS_SHARED)
93 .0 as u64,
94 features: features.0 as u64,
95 request_size: std::mem::size_of::<Request>() as u64,
96 request_init: Some(Request::init),
97 request_cleanup: Some(Request::cleanup),
98 mt_workers_shared: 1,
99 ..unsafe { MaybeUninit::uninit().assume_init() }
100 };
101 let mut handle = MaybeUninit::uninit();
102 let status = unsafe {
103 ucp_init_version(
104 UCP_API_MAJOR,
105 UCP_API_MINOR,
106 ¶ms,
107 config.handle,
108 handle.as_mut_ptr(),
109 )
110 };
111 Error::from_status(status)?;
112
113 Ok(Arc::new(Context {
114 handle: unsafe { handle.assume_init() },
115 }))
116 }
117
118 pub fn create_worker(self: &Arc<Self>) -> Result<Rc<Worker>, Error> {
120 Worker::new(self)
121 }
122
123 pub fn print_to_stderr(&self) {
128 unsafe { ucp_context_print_info(self.handle, stderr) };
129 }
130
131 pub fn query(&self) -> Result<ucp_context_attr, Error> {
133 #[allow(invalid_value)]
134 #[allow(clippy::uninit_assumed_init)]
135 let mut attr = ucp_context_attr {
136 field_mask: (ucp_context_attr_field::UCP_ATTR_FIELD_REQUEST_SIZE
137 | ucp_context_attr_field::UCP_ATTR_FIELD_THREAD_MODE)
138 .0 as u64,
139 ..unsafe { MaybeUninit::uninit().assume_init() }
140 };
141 let status = unsafe { ucp_context_query(self.handle, &mut attr) };
142 Error::from_status(status)?;
143
144 Ok(attr)
145 }
146}
147
148impl Drop for Context {
149 fn drop(&mut self) {
150 unsafe { ucp_cleanup(self.handle) };
151 }
152}
153
154extern "C" {
155 static stderr: *mut FILE;
156}
157
158#[derive(Default)]
170struct Request {
171 waker: AtomicWaker,
172}
173
174impl Request {
175 unsafe extern "C" fn init(request: *mut c_void) {
180 (request as *mut Self).write(Request::default());
181 }
182
183 unsafe extern "C" fn cleanup(request: *mut c_void) {
187 std::ptr::drop_in_place(request as *mut Self)
188 }
189}