wit_bindgen/rt/async_support/stream_support.rs
1//! For a high-level overview of how this module is implemented see the
2//! module documentation in `future_support.rs`.
3
4use crate::rt::async_support::waitable::{WaitableOp, WaitableOperation};
5use crate::rt::async_support::{AbiBuffer, ReturnCode, DROPPED};
6use {
7 crate::rt::Cleanup,
8 std::{
9 alloc::Layout,
10 fmt,
11 future::Future,
12 pin::Pin,
13 ptr,
14 sync::atomic::{AtomicU32, Ordering::Relaxed},
15 task::{Context, Poll},
16 vec::Vec,
17 },
18};
19
20/// Operations that a stream requires throughout the implementation.
21///
22/// This is generated by `wit_bindgen::generate!` primarily.
23#[doc(hidden)]
24pub unsafe trait StreamOps: Clone {
25 /// The Rust type that's sent or received on this stream.
26 type Payload: 'static;
27
28 /// The `stream.new` intrinsic.
29 fn new(&mut self) -> u64;
30
31 /// The canonical ABI layout of the type that this stream is
32 /// sending/receiving.
33 fn elem_layout(&self) -> Layout;
34
35 /// Returns whether `lift` or `lower` is required to create `Self::Payload`.
36 ///
37 /// If this returns `false` then `Self::Payload` is natively in its
38 /// canonical ABI representation.
39 fn native_abi_matches_canonical_abi(&self) -> bool;
40
41 /// Returns whether `O::Payload` has lists that need to be deallocated with
42 /// `dealloc_lists`.
43 fn contains_lists(&self) -> bool;
44
45 /// Converts a Rust type to its canonical ABI representation.
46 unsafe fn lower(&mut self, payload: Self::Payload, dst: *mut u8);
47 /// Used to deallocate any Rust-owned lists in the canonical ABI
48 /// representation for when a value is successfully sent but needs to be
49 /// cleaned up.
50 unsafe fn dealloc_lists(&mut self, dst: *mut u8);
51 /// Converts from the canonical ABI representation to a Rust value.
52 unsafe fn lift(&mut self, dst: *mut u8) -> Self::Payload;
53 /// The `stream.write` intrinsic
54 unsafe fn start_write(&mut self, stream: u32, val: *const u8, amt: usize) -> u32;
55 /// The `stream.read` intrinsic
56 unsafe fn start_read(&mut self, stream: u32, val: *mut u8, amt: usize) -> u32;
57 /// The `stream.cancel-read` intrinsic
58 unsafe fn cancel_read(&mut self, stream: u32) -> u32;
59 /// The `stream.cancel-write` intrinsic
60 unsafe fn cancel_write(&mut self, stream: u32) -> u32;
61 /// The `stream.drop-readable` intrinsic
62 unsafe fn drop_readable(&mut self, stream: u32);
63 /// The `stream.drop-writable` intrinsic
64 unsafe fn drop_writable(&mut self, stream: u32);
65}
66/// Operations that a stream requires throughout the implementation.
67///
68/// This is generated by `wit_bindgen::generate!` primarily.
69#[doc(hidden)]
70pub struct StreamVtable<T> {
71 /// The in-memory canonical ABI layout of a single value of `T`.
72 pub layout: Layout,
73
74 /// An optional callback where if provided will lower an owned `T` value
75 /// into the `dst` pointer.
76 ///
77 /// If this is called the ownership of all of `T`'s lists and resources are
78 /// passed to `dst`, possibly by reallocating if `T`'s layout differs from
79 /// the canonical ABI layout.
80 ///
81 /// If this is `None` then it means that `T` has the same layout in-memory
82 /// in Rust as it does in the canonical ABI. In such a situation the
83 /// lower/lift operation can be dropped.
84 pub lower: Option<unsafe fn(value: T, dst: *mut u8)>,
85
86 /// Callback used to deallocate any owned lists in `dst` after a value has
87 /// been successfully sent along a stream.
88 ///
89 /// `None` means that `T` has no lists internally.
90 pub dealloc_lists: Option<unsafe fn(dst: *mut u8)>,
91
92 /// Dual of `lower`, and like `lower` if this is missing then it means that
93 /// `T` has the same in-memory representation in Rust and the canonical ABI.
94 pub lift: Option<unsafe fn(dst: *mut u8) -> T>,
95
96 /// The raw `stream.write` intrinsic.
97 pub start_write: unsafe extern "C" fn(stream: u32, val: *const u8, amt: usize) -> u32,
98 /// The raw `stream.read` intrinsic.
99 pub start_read: unsafe extern "C" fn(stream: u32, val: *mut u8, amt: usize) -> u32,
100 /// The raw `stream.cancel-write` intrinsic.
101 pub cancel_write: unsafe extern "C" fn(stream: u32) -> u32,
102 /// The raw `stream.cancel-read` intrinsic.
103 pub cancel_read: unsafe extern "C" fn(stream: u32) -> u32,
104 /// The raw `stream.drop-writable` intrinsic.
105 pub drop_writable: unsafe extern "C" fn(stream: u32),
106 /// The raw `stream.drop-readable` intrinsic.
107 pub drop_readable: unsafe extern "C" fn(stream: u32),
108 /// The raw `stream.new` intrinsic.
109 pub new: unsafe extern "C" fn() -> u64,
110}
111
112unsafe impl<T: 'static> StreamOps for &StreamVtable<T> {
113 type Payload = T;
114
115 fn new(&mut self) -> u64 {
116 unsafe { (self.new)() }
117 }
118 fn elem_layout(&self) -> Layout {
119 self.layout
120 }
121 fn native_abi_matches_canonical_abi(&self) -> bool {
122 self.lift.is_none()
123 }
124 fn contains_lists(&self) -> bool {
125 self.dealloc_lists.is_some()
126 }
127 unsafe fn lower(&mut self, payload: Self::Payload, dst: *mut u8) {
128 if let Some(f) = self.lower {
129 f(payload, dst)
130 }
131 }
132 unsafe fn dealloc_lists(&mut self, dst: *mut u8) {
133 if let Some(f) = self.dealloc_lists {
134 f(dst)
135 }
136 }
137 unsafe fn lift(&mut self, dst: *mut u8) -> Self::Payload {
138 (self.lift.unwrap())(dst)
139 }
140 unsafe fn start_write(&mut self, stream: u32, val: *const u8, amt: usize) -> u32 {
141 (self.start_write)(stream, val, amt)
142 }
143 unsafe fn start_read(&mut self, stream: u32, val: *mut u8, amt: usize) -> u32 {
144 (self.start_read)(stream, val, amt)
145 }
146 unsafe fn cancel_read(&mut self, stream: u32) -> u32 {
147 (self.cancel_read)(stream)
148 }
149 unsafe fn cancel_write(&mut self, stream: u32) -> u32 {
150 (self.cancel_write)(stream)
151 }
152 unsafe fn drop_readable(&mut self, stream: u32) {
153 (self.drop_readable)(stream)
154 }
155 unsafe fn drop_writable(&mut self, stream: u32) {
156 (self.drop_writable)(stream)
157 }
158}
159
160/// Helper function to create a new read/write pair for a component model
161/// stream.
162pub unsafe fn stream_new<T>(
163 vtable: &'static StreamVtable<T>,
164) -> (StreamWriter<T>, StreamReader<T>) {
165 unsafe { raw_stream_new(vtable) }
166}
167
168/// Helper function to create a new read/write pair for a component model
169/// stream.
170pub unsafe fn raw_stream_new<O>(mut ops: O) -> (RawStreamWriter<O>, RawStreamReader<O>)
171where
172 O: StreamOps + Clone,
173{
174 unsafe {
175 let handles = ops.new();
176 let reader = handles as u32;
177 let writer = (handles >> 32) as u32;
178 rtdebug!("stream.new() = [{writer}, {reader}]");
179 (
180 RawStreamWriter::new(writer, ops.clone()),
181 RawStreamReader::new(reader, ops),
182 )
183 }
184}
185
186/// Represents the writable end of a Component Model `stream`.
187pub type StreamWriter<T> = RawStreamWriter<&'static StreamVtable<T>>;
188
189/// Represents the writable end of a Component Model `stream`.
190pub struct RawStreamWriter<O: StreamOps> {
191 handle: u32,
192 ops: O,
193 done: bool,
194}
195
196impl<O> RawStreamWriter<O>
197where
198 O: StreamOps,
199{
200 #[doc(hidden)]
201 pub unsafe fn new(handle: u32, ops: O) -> Self {
202 Self {
203 handle,
204 ops,
205 done: false,
206 }
207 }
208
209 /// Returns the index of the component-model handle that this stream is
210 /// using.
211 pub fn handle(&self) -> u32 {
212 self.handle
213 }
214
215 /// Initiate a write of the `values` provided into this stream.
216 ///
217 /// This method is akin to an `async fn` except that the returned
218 /// [`StreamWrite`] future can also be cancelled via [`StreamWrite::cancel`]
219 /// to re-acquire undelivered values.
220 ///
221 /// This method will perform at most a single write of the `values`
222 /// provided. The returned future will resolve once the write has completed.
223 ///
224 /// # Return Values
225 ///
226 /// The returned [`StreamWrite`] future returns a tuple of `(result, buf)`.
227 /// The `result` can be `StreamResult::Complete(n)` meaning that `n` values
228 /// were sent from `values` into this writer. A result of
229 /// `StreamResult::Dropped` means that no values were sent and the other side
230 /// has hung-up and sending values will no longer be possible.
231 ///
232 /// The `buf` returned is an [`AbiBuffer<T>`] which retains ownership of the
233 /// original `values` provided here. That can be used to re-acquire `values`
234 /// through the [`AbiBuffer::into_vec`] method. The `buf` maintains an
235 /// internal cursor of how many values have been written and if the write
236 /// should be resumed to write the entire buffer then the
237 /// [`StreamWriter::write_buf`] method can be used to resume writing at the
238 /// next value in the buffer.
239 ///
240 /// # Cancellation
241 ///
242 /// The returned [`StreamWrite`] future can be cancelled like any other Rust
243 /// future via `drop`, but this means that `values` will be lost within the
244 /// future. The [`StreamWrite::cancel`] method can be used to re-acquire the
245 /// in-progress write that is being done with `values`. This is effectively
246 /// a way of forcing the future to immediately resolve.
247 ///
248 /// Note that if this future is cancelled via `drop` it does not mean that
249 /// no values were sent. It may be possible that values were still sent
250 /// despite being cancelled. Cancelling a write and determining what
251 /// happened must be done with [`StreamWrite::cancel`].
252 pub fn write(&mut self, values: Vec<O::Payload>) -> RawStreamWrite<'_, O> {
253 self.write_buf(AbiBuffer::new(values, self.ops.clone()))
254 }
255
256 /// Same as [`StreamWriter::write`], except this takes [`AbiBuffer<T>`]
257 /// instead of `Vec<T>`.
258 pub fn write_buf(&mut self, values: AbiBuffer<O>) -> RawStreamWrite<'_, O> {
259 RawStreamWrite {
260 op: WaitableOperation::new(StreamWriteOp { writer: self }, values),
261 }
262 }
263
264 /// Writes all of the `values` provided into this stream.
265 ///
266 /// This is a higher-level method than [`StreamWriter::write`] and does not
267 /// expose cancellation for example. This will successively attempt to write
268 /// all of `values` provided into this stream. Upon completion the same
269 /// vector will be returned and any remaining elements in the vector were
270 /// not sent because the stream was dropped.
271 pub async fn write_all(&mut self, values: Vec<O::Payload>) -> Vec<O::Payload> {
272 // Perform an initial write which converts `values` into `AbiBuffer`.
273 let (mut status, mut buf) = self.write(values).await;
274
275 // While the previous write completed and there's still remaining items
276 // in the buffer, perform another write.
277 while let StreamResult::Complete(_) = status {
278 if buf.remaining() == 0 {
279 break;
280 }
281 (status, buf) = self.write_buf(buf).await;
282
283 // FIXME(WebAssembly/component-model#490)
284 if status == StreamResult::Cancelled {
285 status = StreamResult::Complete(0);
286 }
287 }
288
289 // Return back any values that weren't written by shifting them to the
290 // front of the returned vector.
291 assert!(buf.remaining() == 0 || matches!(status, StreamResult::Dropped));
292 buf.into_vec()
293 }
294
295 /// Writes the singular `value` provided
296 ///
297 /// This is a higher-level method than [`StreamWriter::write`] and does not
298 /// expose cancellation for example. This will attempt to send `value` on
299 /// this stream.
300 ///
301 /// If the other end hangs up then the value is returned back as
302 /// `Some(value)`, otherwise `None` is returned indicating the value was
303 /// sent.
304 pub async fn write_one(&mut self, value: O::Payload) -> Option<O::Payload> {
305 // TODO: can probably be a bit more efficient about this and avoid
306 // moving `value` onto the heap in some situations, but that's left as
307 // an optimization for later.
308 self.write_all(std::vec![value]).await.pop()
309 }
310}
311
312impl<O> fmt::Debug for RawStreamWriter<O>
313where
314 O: StreamOps,
315{
316 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
317 f.debug_struct("StreamWriter")
318 .field("handle", &self.handle)
319 .finish()
320 }
321}
322
323impl<O> Drop for RawStreamWriter<O>
324where
325 O: StreamOps,
326{
327 fn drop(&mut self) {
328 rtdebug!("stream.drop-writable({})", self.handle);
329 unsafe {
330 self.ops.drop_writable(self.handle);
331 }
332 }
333}
334
335/// Represents a write operation which may be cancelled prior to completion.
336pub type StreamWrite<'a, T> = RawStreamWrite<'a, &'static StreamVtable<T>>;
337
338/// Represents a write operation which may be cancelled prior to completion.
339pub struct RawStreamWrite<'a, O: StreamOps> {
340 op: WaitableOperation<StreamWriteOp<'a, O>>,
341}
342
343struct StreamWriteOp<'a, O: StreamOps> {
344 writer: &'a mut RawStreamWriter<O>,
345}
346
347/// Result of a [`StreamWriter::write`] or [`StreamReader::read`] operation,
348/// yielded by the [`StreamWrite`] or [`StreamRead`] futures.
349#[derive(Copy, Clone, PartialEq, Eq, Debug)]
350pub enum StreamResult {
351 /// The provided number of values were successfully transferred.
352 ///
353 /// For writes this is how many items were written, and for reads this is
354 /// how many items were read.
355 Complete(usize),
356 /// No values were written, the other end has dropped its handle.
357 Dropped,
358 /// No values were written, the operation was cancelled.
359 Cancelled,
360}
361
362unsafe impl<'a, O> WaitableOp for StreamWriteOp<'a, O>
363where
364 O: StreamOps,
365{
366 type Start = AbiBuffer<O>;
367 type InProgress = AbiBuffer<O>;
368 type Result = (StreamResult, AbiBuffer<O>);
369 type Cancel = (StreamResult, AbiBuffer<O>);
370
371 fn start(&mut self, buf: Self::Start) -> (u32, Self::InProgress) {
372 if self.writer.done {
373 return (DROPPED, buf);
374 }
375
376 let (ptr, len) = buf.abi_ptr_and_len();
377 // SAFETY: sure hope this is safe, everything in this module and
378 // `AbiBuffer` is trying to make this safe.
379 let code = unsafe { self.writer.ops.start_write(self.writer.handle, ptr, len) };
380 rtdebug!(
381 "stream.write({}, {ptr:?}, {len}) = {code:#x}",
382 self.writer.handle
383 );
384 (code, buf)
385 }
386
387 fn start_cancelled(&mut self, buf: Self::Start) -> Self::Cancel {
388 (StreamResult::Cancelled, buf)
389 }
390
391 fn in_progress_update(
392 &mut self,
393 mut buf: Self::InProgress,
394 code: u32,
395 ) -> Result<Self::Result, Self::InProgress> {
396 match ReturnCode::decode(code) {
397 ReturnCode::Blocked => Err(buf),
398 ReturnCode::Dropped(0) => Ok((StreamResult::Dropped, buf)),
399 ReturnCode::Cancelled(0) => Ok((StreamResult::Cancelled, buf)),
400 code @ (ReturnCode::Completed(amt)
401 | ReturnCode::Dropped(amt)
402 | ReturnCode::Cancelled(amt)) => {
403 let amt = amt.try_into().unwrap();
404 buf.advance(amt);
405 if let ReturnCode::Dropped(_) = code {
406 self.writer.done = true;
407 }
408 Ok((StreamResult::Complete(amt), buf))
409 }
410 }
411 }
412
413 fn in_progress_waitable(&mut self, _: &Self::InProgress) -> u32 {
414 self.writer.handle
415 }
416
417 fn in_progress_cancel(&mut self, _: &mut Self::InProgress) -> u32 {
418 // SAFETY: we're managing `writer` and all the various operational bits,
419 // so this relies on `WaitableOperation` being safe.
420 let code = unsafe { self.writer.ops.cancel_write(self.writer.handle) };
421 rtdebug!("stream.cancel-write({}) = {code:#x}", self.writer.handle);
422 code
423 }
424
425 fn result_into_cancel(&mut self, result: Self::Result) -> Self::Cancel {
426 result
427 }
428}
429
430impl<O: StreamOps> Future for RawStreamWrite<'_, O> {
431 type Output = (StreamResult, AbiBuffer<O>);
432
433 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
434 self.pin_project().poll_complete(cx)
435 }
436}
437
438impl<'a, O: StreamOps> RawStreamWrite<'a, O> {
439 fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation<StreamWriteOp<'a, O>>> {
440 // SAFETY: we've chosen that when `Self` is pinned that it translates to
441 // always pinning the inner field, so that's codified here.
442 unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().op) }
443 }
444
445 /// Cancel this write if it hasn't already completed.
446 ///
447 /// This method can be used to cancel a write-in-progress and re-acquire
448 /// values being sent. Note that the result here may still indicate that
449 /// some values were written if the race to cancel the write was lost.
450 ///
451 /// # Panics
452 ///
453 /// Panics if the operation has already been completed via `Future::poll`,
454 /// or if this method is called twice.
455 pub fn cancel(self: Pin<&mut Self>) -> (StreamResult, AbiBuffer<O>) {
456 self.pin_project().cancel()
457 }
458}
459
460/// Represents the readable end of a Component Model `stream`.
461pub type StreamReader<T> = RawStreamReader<&'static StreamVtable<T>>;
462
463/// Represents the readable end of a Component Model `stream`.
464pub struct RawStreamReader<O: StreamOps> {
465 handle: AtomicU32,
466 ops: O,
467 done: bool,
468}
469
470impl<O: StreamOps> fmt::Debug for RawStreamReader<O> {
471 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
472 f.debug_struct("StreamReader")
473 .field("handle", &self.handle)
474 .finish()
475 }
476}
477
478impl<O: StreamOps> RawStreamReader<O> {
479 #[doc(hidden)]
480 pub fn new(handle: u32, ops: O) -> Self {
481 Self {
482 handle: AtomicU32::new(handle),
483 ops,
484 done: false,
485 }
486 }
487
488 #[doc(hidden)]
489 pub fn take_handle(&self) -> u32 {
490 let ret = self.opt_handle().unwrap();
491 self.handle.store(u32::MAX, Relaxed);
492 ret
493 }
494
495 /// Returns the index of the component-model handle that this stream is
496 /// using.
497 pub fn handle(&self) -> u32 {
498 self.opt_handle().unwrap()
499 }
500
501 fn opt_handle(&self) -> Option<u32> {
502 match self.handle.load(Relaxed) {
503 u32::MAX => None,
504 other => Some(other),
505 }
506 }
507
508 /// Starts a new read operation on this stream into `buf`.
509 ///
510 /// This method will read values into the spare capacity of the `buf`
511 /// provided. If `buf` has no spare capacity then this will be equivalent
512 /// to a zero-length read.
513 ///
514 /// Upon completion the `buf` will be yielded back to the caller via the
515 /// completion of the [`StreamRead`] future.
516 ///
517 /// # Cancellation
518 ///
519 /// Cancelling the returned future can be done with `drop` like all Rust
520 /// futures, but it does not mean that no values were read. To accurately
521 /// determine if values were read the [`StreamRead::cancel`] method must be
522 /// used.
523 pub fn read(&mut self, buf: Vec<O::Payload>) -> RawStreamRead<'_, O> {
524 RawStreamRead {
525 op: WaitableOperation::new(StreamReadOp { reader: self }, buf),
526 }
527 }
528
529 /// Reads a single item from this stream.
530 ///
531 /// This is a higher-level method than [`StreamReader::read`] in that it
532 /// reads only a single item and does not expose control over cancellation.
533 pub async fn next(&mut self) -> Option<O::Payload> {
534 // TODO: should amortize this allocation and avoid doing it every time.
535 // Or somehow perhaps make this more optimal.
536 let (_result, mut buf) = self.read(Vec::with_capacity(1)).await;
537 buf.pop()
538 }
539
540 /// Reads all items from this stream and returns the list.
541 ///
542 /// This method will read all remaining items from this stream into a list
543 /// and await the stream to be dropped.
544 pub async fn collect(mut self) -> Vec<O::Payload> {
545 let mut ret = Vec::new();
546 loop {
547 // If there's no more spare capacity then reserve room for one item
548 // which should trigger `Vec`'s built-in resizing logic, which will
549 // free up likely more capacity than just one slot.
550 if ret.len() == ret.capacity() {
551 ret.reserve(1);
552 }
553 let (status, buf) = self.read(ret).await;
554 ret = buf;
555 match status {
556 StreamResult::Complete(_) => {}
557 StreamResult::Dropped => break,
558 StreamResult::Cancelled => unreachable!(),
559 }
560 }
561 ret
562 }
563}
564
565impl<O: StreamOps> Drop for RawStreamReader<O> {
566 fn drop(&mut self) {
567 let Some(handle) = self.opt_handle() else {
568 return;
569 };
570 unsafe {
571 rtdebug!("stream.drop-readable({})", handle);
572 self.ops.drop_readable(handle);
573 }
574 }
575}
576
577/// Represents a read operation which may be cancelled prior to completion.
578pub type StreamRead<'a, T> = RawStreamRead<'a, &'static StreamVtable<T>>;
579
580/// Represents a read operation which may be cancelled prior to completion.
581pub struct RawStreamRead<'a, O: StreamOps> {
582 op: WaitableOperation<StreamReadOp<'a, O>>,
583}
584
585struct StreamReadOp<'a, O: StreamOps> {
586 reader: &'a mut RawStreamReader<O>,
587}
588
589unsafe impl<'a, O: StreamOps> WaitableOp for StreamReadOp<'a, O> {
590 type Start = Vec<O::Payload>;
591 type InProgress = (Vec<O::Payload>, Option<Cleanup>);
592 type Result = (StreamResult, Vec<O::Payload>);
593 type Cancel = (StreamResult, Vec<O::Payload>);
594
595 fn start(&mut self, mut buf: Self::Start) -> (u32, Self::InProgress) {
596 if self.reader.done {
597 return (DROPPED, (buf, None));
598 }
599
600 let cap = buf.spare_capacity_mut();
601 let ptr;
602 let cleanup;
603 // If `T` requires a lifting operation, then allocate a slab of memory
604 // which will store the canonical ABI read. Otherwise we can use the
605 // raw capacity in `buf` itself.
606 if self.reader.ops.native_abi_matches_canonical_abi() {
607 ptr = cap.as_mut_ptr().cast();
608 cleanup = None;
609 } else {
610 let elem_layout = self.reader.ops.elem_layout();
611 let layout =
612 Layout::from_size_align(elem_layout.size() * cap.len(), elem_layout.align())
613 .unwrap();
614 (ptr, cleanup) = Cleanup::new(layout);
615 }
616 // SAFETY: `ptr` is either in `buf` or in `cleanup`, both of which will
617 // persist with this async operation itself.
618 let code = unsafe {
619 self.reader
620 .ops
621 .start_read(self.reader.handle(), ptr, cap.len())
622 };
623 rtdebug!(
624 "stream.read({}, {ptr:?}, {}) = {code:#x}",
625 self.reader.handle(),
626 cap.len()
627 );
628 (code, (buf, cleanup))
629 }
630
631 fn start_cancelled(&mut self, buf: Self::Start) -> Self::Cancel {
632 (StreamResult::Cancelled, buf)
633 }
634
635 fn in_progress_update(
636 &mut self,
637 (mut buf, cleanup): Self::InProgress,
638 code: u32,
639 ) -> Result<Self::Result, Self::InProgress> {
640 match ReturnCode::decode(code) {
641 ReturnCode::Blocked => Err((buf, cleanup)),
642
643 // Note that the `cleanup`, if any, is discarded here.
644 ReturnCode::Dropped(0) => Ok((StreamResult::Dropped, buf)),
645
646 // When an in-progress read is successfully cancelled then the
647 // allocation that was being read into, if any, is just discarded.
648 //
649 // TODO: should maybe thread this around like `AbiBuffer` to cache
650 // the read allocation?
651 ReturnCode::Cancelled(0) => Ok((StreamResult::Cancelled, buf)),
652
653 code @ (ReturnCode::Completed(amt)
654 | ReturnCode::Dropped(amt)
655 | ReturnCode::Cancelled(amt)) => {
656 let amt = usize::try_from(amt).unwrap();
657 let cur_len = buf.len();
658 assert!(amt <= buf.capacity() - cur_len);
659
660 if self.reader.ops.native_abi_matches_canonical_abi() {
661 // If no `lift` was necessary, then the results of this operation
662 // were read directly into `buf`, so just update its length now that
663 // values have been initialized.
664 unsafe {
665 buf.set_len(cur_len + amt);
666 }
667 } else {
668 // With a `lift` operation this now requires reading `amt` items
669 // from `cleanup` and pushing them into `buf`.
670 let mut ptr = cleanup
671 .as_ref()
672 .map(|c| c.ptr.as_ptr())
673 .unwrap_or(ptr::null_mut());
674 for _ in 0..amt {
675 unsafe {
676 buf.push(self.reader.ops.lift(ptr));
677 ptr = ptr.add(self.reader.ops.elem_layout().size());
678 }
679 }
680 }
681
682 // Intentionally dispose of `cleanup` here as, if it was used, all
683 // allocations have been read from it and appended to `buf`.
684 drop(cleanup);
685 if let ReturnCode::Dropped(_) = code {
686 self.reader.done = true;
687 }
688 Ok((StreamResult::Complete(amt), buf))
689 }
690 }
691 }
692
693 fn in_progress_waitable(&mut self, _: &Self::InProgress) -> u32 {
694 self.reader.handle()
695 }
696
697 fn in_progress_cancel(&mut self, _: &mut Self::InProgress) -> u32 {
698 // SAFETY: we're managing `reader` and all the various operational bits,
699 // so this relies on `WaitableOperation` being safe.
700 let code = unsafe { self.reader.ops.cancel_read(self.reader.handle()) };
701 rtdebug!("stream.cancel-read({}) = {code:#x}", self.reader.handle());
702 code
703 }
704
705 fn result_into_cancel(&mut self, result: Self::Result) -> Self::Cancel {
706 result
707 }
708}
709
710impl<O: StreamOps> Future for RawStreamRead<'_, O> {
711 type Output = (StreamResult, Vec<O::Payload>);
712
713 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
714 self.pin_project().poll_complete(cx)
715 }
716}
717
718impl<'a, O> RawStreamRead<'a, O>
719where
720 O: StreamOps,
721{
722 fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation<StreamReadOp<'a, O>>> {
723 // SAFETY: we've chosen that when `Self` is pinned that it translates to
724 // always pinning the inner field, so that's codified here.
725 unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().op) }
726 }
727
728 /// Cancel this read if it hasn't already completed.
729 ///
730 /// This method will initiate a cancellation operation for this active
731 /// read. This may race with the actual read itself and so this may actually
732 /// complete with some results.
733 ///
734 /// The final result of cancellation is returned, along with the original
735 /// buffer.
736 ///
737 /// # Panics
738 ///
739 /// Panics if the operation has already been completed via `Future::poll`,
740 /// or if this method is called twice.
741 pub fn cancel(self: Pin<&mut Self>) -> (StreamResult, Vec<O::Payload>) {
742 self.pin_project().cancel()
743 }
744}