async_ucx/ucp/endpoint/
stream.rs1use super::*;
2
3impl Endpoint {
4 pub async fn stream_send(&self, buf: &[u8]) -> Result<usize, Error> {
6 trace!("stream_send: endpoint={:?} len={}", self.handle, buf.len());
7 unsafe extern "C" fn callback(request: *mut c_void, status: ucs_status_t) {
8 trace!(
9 "stream_send: complete. req={:?}, status={:?}",
10 request,
11 status
12 );
13 let request = &mut *(request as *mut Request);
14 request.waker.wake();
15 }
16 let status = unsafe {
17 ucp_stream_send_nb(
18 self.get_handle()?,
19 buf.as_ptr() as _,
20 buf.len() as _,
21 ucp_dt_make_contig(1),
22 Some(callback),
23 0,
24 )
25 };
26 if status.is_null() {
27 trace!("stream_send: complete");
28 } else if UCS_PTR_IS_PTR(status) {
29 RequestHandle {
30 ptr: status,
31 poll_fn: poll_normal,
32 }
33 .await?;
34 } else {
35 return Err(Error::from_ptr(status).unwrap_err());
36 }
37 Ok(buf.len())
38 }
39
40 pub async fn stream_recv(&self, buf: &mut [MaybeUninit<u8>]) -> Result<usize, Error> {
42 trace!("stream_recv: endpoint={:?} len={}", self.handle, buf.len());
43 unsafe extern "C" fn callback(request: *mut c_void, status: ucs_status_t, length: u64) {
44 trace!(
45 "stream_recv: complete. req={:?}, status={:?}, len={}",
46 request,
47 status,
48 length
49 );
50 let request = &mut *(request as *mut Request);
51 request.waker.wake();
52 }
53 let mut length = MaybeUninit::uninit();
54 let status = unsafe {
55 ucp_stream_recv_nb(
56 self.get_handle()?,
57 buf.as_mut_ptr() as _,
58 buf.len() as _,
59 ucp_dt_make_contig(1),
60 Some(callback),
61 length.as_mut_ptr(),
62 0,
63 )
64 };
65 if status.is_null() {
66 let length = unsafe { length.assume_init() } as usize;
67 trace!("stream_recv: complete. len={}", length);
68 Ok(length)
69 } else if UCS_PTR_IS_PTR(status) {
70 Ok(RequestHandle {
71 ptr: status,
72 poll_fn: poll_stream,
73 }
74 .await)
75 } else {
76 Err(Error::from_ptr(status).unwrap_err())
77 }
78 }
79}
80
81unsafe fn poll_stream(ptr: ucs_status_ptr_t) -> Poll<usize> {
82 let mut len = MaybeUninit::<usize>::uninit();
83 let status = ucp_stream_recv_request_test(ptr as _, len.as_mut_ptr() as _);
84 if status == ucs_status_t::UCS_INPROGRESS {
85 Poll::Pending
86 } else {
87 Poll::Ready(len.assume_init())
88 }
89}