async_ucx/ucp/endpoint/
stream.rs

1use super::*;
2
3impl Endpoint {
4    /// Sends data through stream.
5    pub async fn stream_send(&self, buf: &[u8]) -> Result<usize, Error> {
6        trace!("stream_send: endpoint={:?} len={}", self.handle, buf.len());
7        unsafe extern "C" fn callback(request: *mut c_void, status: ucs_status_t) {
8            trace!(
9                "stream_send: complete. req={:?}, status={:?}",
10                request,
11                status
12            );
13            let request = &mut *(request as *mut Request);
14            request.waker.wake();
15        }
16        let status = unsafe {
17            ucp_stream_send_nb(
18                self.get_handle()?,
19                buf.as_ptr() as _,
20                buf.len() as _,
21                ucp_dt_make_contig(1),
22                Some(callback),
23                0,
24            )
25        };
26        if status.is_null() {
27            trace!("stream_send: complete");
28        } else if UCS_PTR_IS_PTR(status) {
29            RequestHandle {
30                ptr: status,
31                poll_fn: poll_normal,
32            }
33            .await?;
34        } else {
35            return Err(Error::from_ptr(status).unwrap_err());
36        }
37        Ok(buf.len())
38    }
39
40    /// Receives data from stream.
41    pub async fn stream_recv(&self, buf: &mut [MaybeUninit<u8>]) -> Result<usize, Error> {
42        trace!("stream_recv: endpoint={:?} len={}", self.handle, buf.len());
43        unsafe extern "C" fn callback(request: *mut c_void, status: ucs_status_t, length: u64) {
44            trace!(
45                "stream_recv: complete. req={:?}, status={:?}, len={}",
46                request,
47                status,
48                length
49            );
50            let request = &mut *(request as *mut Request);
51            request.waker.wake();
52        }
53        let mut length = MaybeUninit::uninit();
54        let status = unsafe {
55            ucp_stream_recv_nb(
56                self.get_handle()?,
57                buf.as_mut_ptr() as _,
58                buf.len() as _,
59                ucp_dt_make_contig(1),
60                Some(callback),
61                length.as_mut_ptr(),
62                0,
63            )
64        };
65        if status.is_null() {
66            let length = unsafe { length.assume_init() } as usize;
67            trace!("stream_recv: complete. len={}", length);
68            Ok(length)
69        } else if UCS_PTR_IS_PTR(status) {
70            Ok(RequestHandle {
71                ptr: status,
72                poll_fn: poll_stream,
73            }
74            .await)
75        } else {
76            Err(Error::from_ptr(status).unwrap_err())
77        }
78    }
79}
80
81unsafe fn poll_stream(ptr: ucs_status_ptr_t) -> Poll<usize> {
82    let mut len = MaybeUninit::<usize>::uninit();
83    let status = ucp_stream_recv_request_test(ptr as _, len.as_mut_ptr() as _);
84    if status == ucs_status_t::UCS_INPROGRESS {
85        Poll::Pending
86    } else {
87        Poll::Ready(len.assume_init())
88    }
89}