wit_bindgen/rt/async_support/
stream_support.rs

1//! For a high-level overview of how this module is implemented see the
2//! module documentation in `future_support.rs`.
3
4use crate::rt::async_support::waitable::{WaitableOp, WaitableOperation};
5use crate::rt::async_support::{AbiBuffer, ReturnCode, DROPPED};
6use {
7    crate::rt::Cleanup,
8    std::{
9        alloc::Layout,
10        fmt,
11        future::Future,
12        pin::Pin,
13        ptr,
14        sync::atomic::{AtomicU32, Ordering::Relaxed},
15        task::{Context, Poll},
16        vec::Vec,
17    },
18};
19
20/// Operations that a stream requires throughout the implementation.
21///
22/// This is generated by `wit_bindgen::generate!` primarily.
23#[doc(hidden)]
24pub unsafe trait StreamOps: Clone {
25    /// The Rust type that's sent or received on this stream.
26    type Payload: 'static;
27
28    /// The `stream.new` intrinsic.
29    fn new(&mut self) -> u64;
30
31    /// The canonical ABI layout of the type that this stream is
32    /// sending/receiving.
33    fn elem_layout(&self) -> Layout;
34
35    /// Returns whether `lift` or `lower` is required to create `Self::Payload`.
36    ///
37    /// If this returns `false` then `Self::Payload` is natively in its
38    /// canonical ABI representation.
39    fn native_abi_matches_canonical_abi(&self) -> bool;
40
41    /// Returns whether `O::Payload` has lists that need to be deallocated with
42    /// `dealloc_lists`.
43    fn contains_lists(&self) -> bool;
44
45    /// Converts a Rust type to its canonical ABI representation.
46    unsafe fn lower(&mut self, payload: Self::Payload, dst: *mut u8);
47    /// Used to deallocate any Rust-owned lists in the canonical ABI
48    /// representation for when a value is successfully sent but needs to be
49    /// cleaned up.
50    unsafe fn dealloc_lists(&mut self, dst: *mut u8);
51    /// Converts from the canonical ABI representation to a Rust value.
52    unsafe fn lift(&mut self, dst: *mut u8) -> Self::Payload;
53    /// The `stream.write` intrinsic
54    unsafe fn start_write(&mut self, stream: u32, val: *const u8, amt: usize) -> u32;
55    /// The `stream.read` intrinsic
56    unsafe fn start_read(&mut self, stream: u32, val: *mut u8, amt: usize) -> u32;
57    /// The `stream.cancel-read` intrinsic
58    unsafe fn cancel_read(&mut self, stream: u32) -> u32;
59    /// The `stream.cancel-write` intrinsic
60    unsafe fn cancel_write(&mut self, stream: u32) -> u32;
61    /// The `stream.drop-readable` intrinsic
62    unsafe fn drop_readable(&mut self, stream: u32);
63    /// The `stream.drop-writable` intrinsic
64    unsafe fn drop_writable(&mut self, stream: u32);
65}
66/// Operations that a stream requires throughout the implementation.
67///
68/// This is generated by `wit_bindgen::generate!` primarily.
69#[doc(hidden)]
70pub struct StreamVtable<T> {
71    /// The in-memory canonical ABI layout of a single value of `T`.
72    pub layout: Layout,
73
74    /// An optional callback where if provided will lower an owned `T` value
75    /// into the `dst` pointer.
76    ///
77    /// If this is called the ownership of all of `T`'s lists and resources are
78    /// passed to `dst`, possibly by reallocating if `T`'s layout differs from
79    /// the canonical ABI layout.
80    ///
81    /// If this is `None` then it means that `T` has the same layout in-memory
82    /// in Rust as it does in the canonical ABI. In such a situation the
83    /// lower/lift operation can be dropped.
84    pub lower: Option<unsafe fn(value: T, dst: *mut u8)>,
85
86    /// Callback used to deallocate any owned lists in `dst` after a value has
87    /// been successfully sent along a stream.
88    ///
89    /// `None` means that `T` has no lists internally.
90    pub dealloc_lists: Option<unsafe fn(dst: *mut u8)>,
91
92    /// Dual of `lower`, and like `lower` if this is missing then it means that
93    /// `T` has the same in-memory representation in Rust and the canonical ABI.
94    pub lift: Option<unsafe fn(dst: *mut u8) -> T>,
95
96    /// The raw `stream.write` intrinsic.
97    pub start_write: unsafe extern "C" fn(stream: u32, val: *const u8, amt: usize) -> u32,
98    /// The raw `stream.read` intrinsic.
99    pub start_read: unsafe extern "C" fn(stream: u32, val: *mut u8, amt: usize) -> u32,
100    /// The raw `stream.cancel-write` intrinsic.
101    pub cancel_write: unsafe extern "C" fn(stream: u32) -> u32,
102    /// The raw `stream.cancel-read` intrinsic.
103    pub cancel_read: unsafe extern "C" fn(stream: u32) -> u32,
104    /// The raw `stream.drop-writable` intrinsic.
105    pub drop_writable: unsafe extern "C" fn(stream: u32),
106    /// The raw `stream.drop-readable` intrinsic.
107    pub drop_readable: unsafe extern "C" fn(stream: u32),
108    /// The raw `stream.new` intrinsic.
109    pub new: unsafe extern "C" fn() -> u64,
110}
111
112unsafe impl<T: 'static> StreamOps for &StreamVtable<T> {
113    type Payload = T;
114
115    fn new(&mut self) -> u64 {
116        unsafe { (self.new)() }
117    }
118    fn elem_layout(&self) -> Layout {
119        self.layout
120    }
121    fn native_abi_matches_canonical_abi(&self) -> bool {
122        self.lift.is_none()
123    }
124    fn contains_lists(&self) -> bool {
125        self.dealloc_lists.is_some()
126    }
127    unsafe fn lower(&mut self, payload: Self::Payload, dst: *mut u8) {
128        if let Some(f) = self.lower {
129            f(payload, dst)
130        }
131    }
132    unsafe fn dealloc_lists(&mut self, dst: *mut u8) {
133        if let Some(f) = self.dealloc_lists {
134            f(dst)
135        }
136    }
137    unsafe fn lift(&mut self, dst: *mut u8) -> Self::Payload {
138        (self.lift.unwrap())(dst)
139    }
140    unsafe fn start_write(&mut self, stream: u32, val: *const u8, amt: usize) -> u32 {
141        (self.start_write)(stream, val, amt)
142    }
143    unsafe fn start_read(&mut self, stream: u32, val: *mut u8, amt: usize) -> u32 {
144        (self.start_read)(stream, val, amt)
145    }
146    unsafe fn cancel_read(&mut self, stream: u32) -> u32 {
147        (self.cancel_read)(stream)
148    }
149    unsafe fn cancel_write(&mut self, stream: u32) -> u32 {
150        (self.cancel_write)(stream)
151    }
152    unsafe fn drop_readable(&mut self, stream: u32) {
153        (self.drop_readable)(stream)
154    }
155    unsafe fn drop_writable(&mut self, stream: u32) {
156        (self.drop_writable)(stream)
157    }
158}
159
160/// Helper function to create a new read/write pair for a component model
161/// stream.
162pub unsafe fn stream_new<T>(
163    vtable: &'static StreamVtable<T>,
164) -> (StreamWriter<T>, StreamReader<T>) {
165    unsafe { raw_stream_new(vtable) }
166}
167
168/// Helper function to create a new read/write pair for a component model
169/// stream.
170pub unsafe fn raw_stream_new<O>(mut ops: O) -> (RawStreamWriter<O>, RawStreamReader<O>)
171where
172    O: StreamOps + Clone,
173{
174    unsafe {
175        let handles = ops.new();
176        let reader = handles as u32;
177        let writer = (handles >> 32) as u32;
178        rtdebug!("stream.new() = [{writer}, {reader}]");
179        (
180            RawStreamWriter::new(writer, ops.clone()),
181            RawStreamReader::new(reader, ops),
182        )
183    }
184}
185
186/// Represents the writable end of a Component Model `stream`.
187pub type StreamWriter<T> = RawStreamWriter<&'static StreamVtable<T>>;
188
189/// Represents the writable end of a Component Model `stream`.
190pub struct RawStreamWriter<O: StreamOps> {
191    handle: u32,
192    ops: O,
193    done: bool,
194}
195
196impl<O> RawStreamWriter<O>
197where
198    O: StreamOps,
199{
200    #[doc(hidden)]
201    pub unsafe fn new(handle: u32, ops: O) -> Self {
202        Self {
203            handle,
204            ops,
205            done: false,
206        }
207    }
208
209    /// Returns the index of the component-model handle that this stream is
210    /// using.
211    pub fn handle(&self) -> u32 {
212        self.handle
213    }
214
215    /// Initiate a write of the `values` provided into this stream.
216    ///
217    /// This method is akin to an `async fn` except that the returned
218    /// [`StreamWrite`] future can also be cancelled via [`StreamWrite::cancel`]
219    /// to re-acquire undelivered values.
220    ///
221    /// This method will perform at most a single write of the `values`
222    /// provided. The returned future will resolve once the write has completed.
223    ///
224    /// # Return Values
225    ///
226    /// The returned [`StreamWrite`] future returns a tuple of `(result, buf)`.
227    /// The `result` can be `StreamResult::Complete(n)` meaning that `n` values
228    /// were sent from `values` into this writer. A result of
229    /// `StreamResult::Dropped` means that no values were sent and the other side
230    /// has hung-up and sending values will no longer be possible.
231    ///
232    /// The `buf` returned is an [`AbiBuffer<T>`] which retains ownership of the
233    /// original `values` provided here. That can be used to re-acquire `values`
234    /// through the [`AbiBuffer::into_vec`] method. The `buf` maintains an
235    /// internal cursor of how many values have been written and if the write
236    /// should be resumed to write the entire buffer then the
237    /// [`StreamWriter::write_buf`] method can be used to resume writing at the
238    /// next value in the buffer.
239    ///
240    /// # Cancellation
241    ///
242    /// The returned [`StreamWrite`] future can be cancelled like any other Rust
243    /// future via `drop`, but this means that `values` will be lost within the
244    /// future. The [`StreamWrite::cancel`] method can be used to re-acquire the
245    /// in-progress write that is being done with `values`. This is effectively
246    /// a way of forcing the future to immediately resolve.
247    ///
248    /// Note that if this future is cancelled via `drop` it does not mean that
249    /// no values were sent. It may be possible that values were still sent
250    /// despite being cancelled. Cancelling a write and determining what
251    /// happened must be done with [`StreamWrite::cancel`].
252    pub fn write(&mut self, values: Vec<O::Payload>) -> RawStreamWrite<'_, O> {
253        self.write_buf(AbiBuffer::new(values, self.ops.clone()))
254    }
255
256    /// Same as [`StreamWriter::write`], except this takes [`AbiBuffer<T>`]
257    /// instead of `Vec<T>`.
258    pub fn write_buf(&mut self, values: AbiBuffer<O>) -> RawStreamWrite<'_, O> {
259        RawStreamWrite {
260            op: WaitableOperation::new(StreamWriteOp { writer: self }, values),
261        }
262    }
263
264    /// Writes all of the `values` provided into this stream.
265    ///
266    /// This is a higher-level method than [`StreamWriter::write`] and does not
267    /// expose cancellation for example. This will successively attempt to write
268    /// all of `values` provided into this stream. Upon completion the same
269    /// vector will be returned and any remaining elements in the vector were
270    /// not sent because the stream was dropped.
271    pub async fn write_all(&mut self, values: Vec<O::Payload>) -> Vec<O::Payload> {
272        // Perform an initial write which converts `values` into `AbiBuffer`.
273        let (mut status, mut buf) = self.write(values).await;
274
275        // While the previous write completed and there's still remaining items
276        // in the buffer, perform another write.
277        while let StreamResult::Complete(_) = status {
278            if buf.remaining() == 0 {
279                break;
280            }
281            (status, buf) = self.write_buf(buf).await;
282
283            // FIXME(WebAssembly/component-model#490)
284            if status == StreamResult::Cancelled {
285                status = StreamResult::Complete(0);
286            }
287        }
288
289        // Return back any values that weren't written by shifting them to the
290        // front of the returned vector.
291        assert!(buf.remaining() == 0 || matches!(status, StreamResult::Dropped));
292        buf.into_vec()
293    }
294
295    /// Writes the singular `value` provided
296    ///
297    /// This is a higher-level method than [`StreamWriter::write`] and does not
298    /// expose cancellation for example. This will attempt to send `value` on
299    /// this stream.
300    ///
301    /// If the other end hangs up then the value is returned back as
302    /// `Some(value)`, otherwise `None` is returned indicating the value was
303    /// sent.
304    pub async fn write_one(&mut self, value: O::Payload) -> Option<O::Payload> {
305        // TODO: can probably be a bit more efficient about this and avoid
306        // moving `value` onto the heap in some situations, but that's left as
307        // an optimization for later.
308        self.write_all(std::vec![value]).await.pop()
309    }
310}
311
312impl<O> fmt::Debug for RawStreamWriter<O>
313where
314    O: StreamOps,
315{
316    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
317        f.debug_struct("StreamWriter")
318            .field("handle", &self.handle)
319            .finish()
320    }
321}
322
323impl<O> Drop for RawStreamWriter<O>
324where
325    O: StreamOps,
326{
327    fn drop(&mut self) {
328        rtdebug!("stream.drop-writable({})", self.handle);
329        unsafe {
330            self.ops.drop_writable(self.handle);
331        }
332    }
333}
334
335/// Represents a write operation which may be cancelled prior to completion.
336pub type StreamWrite<'a, T> = RawStreamWrite<'a, &'static StreamVtable<T>>;
337
338/// Represents a write operation which may be cancelled prior to completion.
339pub struct RawStreamWrite<'a, O: StreamOps> {
340    op: WaitableOperation<StreamWriteOp<'a, O>>,
341}
342
343struct StreamWriteOp<'a, O: StreamOps> {
344    writer: &'a mut RawStreamWriter<O>,
345}
346
347/// Result of a [`StreamWriter::write`] or [`StreamReader::read`] operation,
348/// yielded by the [`StreamWrite`] or [`StreamRead`] futures.
349#[derive(Copy, Clone, PartialEq, Eq, Debug)]
350pub enum StreamResult {
351    /// The provided number of values were successfully transferred.
352    ///
353    /// For writes this is how many items were written, and for reads this is
354    /// how many items were read.
355    Complete(usize),
356    /// No values were written, the other end has dropped its handle.
357    Dropped,
358    /// No values were written, the operation was cancelled.
359    Cancelled,
360}
361
362unsafe impl<'a, O> WaitableOp for StreamWriteOp<'a, O>
363where
364    O: StreamOps,
365{
366    type Start = AbiBuffer<O>;
367    type InProgress = AbiBuffer<O>;
368    type Result = (StreamResult, AbiBuffer<O>);
369    type Cancel = (StreamResult, AbiBuffer<O>);
370
371    fn start(&mut self, buf: Self::Start) -> (u32, Self::InProgress) {
372        if self.writer.done {
373            return (DROPPED, buf);
374        }
375
376        let (ptr, len) = buf.abi_ptr_and_len();
377        // SAFETY: sure hope this is safe, everything in this module and
378        // `AbiBuffer` is trying to make this safe.
379        let code = unsafe { self.writer.ops.start_write(self.writer.handle, ptr, len) };
380        rtdebug!(
381            "stream.write({}, {ptr:?}, {len}) = {code:#x}",
382            self.writer.handle
383        );
384        (code, buf)
385    }
386
387    fn start_cancelled(&mut self, buf: Self::Start) -> Self::Cancel {
388        (StreamResult::Cancelled, buf)
389    }
390
391    fn in_progress_update(
392        &mut self,
393        mut buf: Self::InProgress,
394        code: u32,
395    ) -> Result<Self::Result, Self::InProgress> {
396        match ReturnCode::decode(code) {
397            ReturnCode::Blocked => Err(buf),
398            ReturnCode::Dropped(0) => Ok((StreamResult::Dropped, buf)),
399            ReturnCode::Cancelled(0) => Ok((StreamResult::Cancelled, buf)),
400            code @ (ReturnCode::Completed(amt)
401            | ReturnCode::Dropped(amt)
402            | ReturnCode::Cancelled(amt)) => {
403                let amt = amt.try_into().unwrap();
404                buf.advance(amt);
405                if let ReturnCode::Dropped(_) = code {
406                    self.writer.done = true;
407                }
408                Ok((StreamResult::Complete(amt), buf))
409            }
410        }
411    }
412
413    fn in_progress_waitable(&mut self, _: &Self::InProgress) -> u32 {
414        self.writer.handle
415    }
416
417    fn in_progress_cancel(&mut self, _: &mut Self::InProgress) -> u32 {
418        // SAFETY: we're managing `writer` and all the various operational bits,
419        // so this relies on `WaitableOperation` being safe.
420        let code = unsafe { self.writer.ops.cancel_write(self.writer.handle) };
421        rtdebug!("stream.cancel-write({}) = {code:#x}", self.writer.handle);
422        code
423    }
424
425    fn result_into_cancel(&mut self, result: Self::Result) -> Self::Cancel {
426        result
427    }
428}
429
430impl<O: StreamOps> Future for RawStreamWrite<'_, O> {
431    type Output = (StreamResult, AbiBuffer<O>);
432
433    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
434        self.pin_project().poll_complete(cx)
435    }
436}
437
438impl<'a, O: StreamOps> RawStreamWrite<'a, O> {
439    fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation<StreamWriteOp<'a, O>>> {
440        // SAFETY: we've chosen that when `Self` is pinned that it translates to
441        // always pinning the inner field, so that's codified here.
442        unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().op) }
443    }
444
445    /// Cancel this write if it hasn't already completed.
446    ///
447    /// This method can be used to cancel a write-in-progress and re-acquire
448    /// values being sent. Note that the result here may still indicate that
449    /// some values were written if the race to cancel the write was lost.
450    ///
451    /// # Panics
452    ///
453    /// Panics if the operation has already been completed via `Future::poll`,
454    /// or if this method is called twice.
455    pub fn cancel(self: Pin<&mut Self>) -> (StreamResult, AbiBuffer<O>) {
456        self.pin_project().cancel()
457    }
458}
459
460/// Represents the readable end of a Component Model `stream`.
461pub type StreamReader<T> = RawStreamReader<&'static StreamVtable<T>>;
462
463/// Represents the readable end of a Component Model `stream`.
464pub struct RawStreamReader<O: StreamOps> {
465    handle: AtomicU32,
466    ops: O,
467    done: bool,
468}
469
470impl<O: StreamOps> fmt::Debug for RawStreamReader<O> {
471    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
472        f.debug_struct("StreamReader")
473            .field("handle", &self.handle)
474            .finish()
475    }
476}
477
478impl<O: StreamOps> RawStreamReader<O> {
479    #[doc(hidden)]
480    pub fn new(handle: u32, ops: O) -> Self {
481        Self {
482            handle: AtomicU32::new(handle),
483            ops,
484            done: false,
485        }
486    }
487
488    #[doc(hidden)]
489    pub fn take_handle(&self) -> u32 {
490        let ret = self.opt_handle().unwrap();
491        self.handle.store(u32::MAX, Relaxed);
492        ret
493    }
494
495    /// Returns the index of the component-model handle that this stream is
496    /// using.
497    pub fn handle(&self) -> u32 {
498        self.opt_handle().unwrap()
499    }
500
501    fn opt_handle(&self) -> Option<u32> {
502        match self.handle.load(Relaxed) {
503            u32::MAX => None,
504            other => Some(other),
505        }
506    }
507
508    /// Starts a new read operation on this stream into `buf`.
509    ///
510    /// This method will read values into the spare capacity of the `buf`
511    /// provided. If `buf` has no spare capacity then this will be equivalent
512    /// to a zero-length read.
513    ///
514    /// Upon completion the `buf` will be yielded back to the caller via the
515    /// completion of the [`StreamRead`] future.
516    ///
517    /// # Cancellation
518    ///
519    /// Cancelling the returned future can be done with `drop` like all Rust
520    /// futures, but it does not mean that no values were read. To accurately
521    /// determine if values were read the [`StreamRead::cancel`] method must be
522    /// used.
523    pub fn read(&mut self, buf: Vec<O::Payload>) -> RawStreamRead<'_, O> {
524        RawStreamRead {
525            op: WaitableOperation::new(StreamReadOp { reader: self }, buf),
526        }
527    }
528
529    /// Reads a single item from this stream.
530    ///
531    /// This is a higher-level method than [`StreamReader::read`] in that it
532    /// reads only a single item and does not expose control over cancellation.
533    pub async fn next(&mut self) -> Option<O::Payload> {
534        // TODO: should amortize this allocation and avoid doing it every time.
535        // Or somehow perhaps make this more optimal.
536        let (_result, mut buf) = self.read(Vec::with_capacity(1)).await;
537        buf.pop()
538    }
539
540    /// Reads all items from this stream and returns the list.
541    ///
542    /// This method will read all remaining items from this stream into a list
543    /// and await the stream to be dropped.
544    pub async fn collect(mut self) -> Vec<O::Payload> {
545        let mut ret = Vec::new();
546        loop {
547            // If there's no more spare capacity then reserve room for one item
548            // which should trigger `Vec`'s built-in resizing logic, which will
549            // free up likely more capacity than just one slot.
550            if ret.len() == ret.capacity() {
551                ret.reserve(1);
552            }
553            let (status, buf) = self.read(ret).await;
554            ret = buf;
555            match status {
556                StreamResult::Complete(_) => {}
557                StreamResult::Dropped => break,
558                StreamResult::Cancelled => unreachable!(),
559            }
560        }
561        ret
562    }
563}
564
565impl<O: StreamOps> Drop for RawStreamReader<O> {
566    fn drop(&mut self) {
567        let Some(handle) = self.opt_handle() else {
568            return;
569        };
570        unsafe {
571            rtdebug!("stream.drop-readable({})", handle);
572            self.ops.drop_readable(handle);
573        }
574    }
575}
576
577/// Represents a read operation which may be cancelled prior to completion.
578pub type StreamRead<'a, T> = RawStreamRead<'a, &'static StreamVtable<T>>;
579
580/// Represents a read operation which may be cancelled prior to completion.
581pub struct RawStreamRead<'a, O: StreamOps> {
582    op: WaitableOperation<StreamReadOp<'a, O>>,
583}
584
585struct StreamReadOp<'a, O: StreamOps> {
586    reader: &'a mut RawStreamReader<O>,
587}
588
589unsafe impl<'a, O: StreamOps> WaitableOp for StreamReadOp<'a, O> {
590    type Start = Vec<O::Payload>;
591    type InProgress = (Vec<O::Payload>, Option<Cleanup>);
592    type Result = (StreamResult, Vec<O::Payload>);
593    type Cancel = (StreamResult, Vec<O::Payload>);
594
595    fn start(&mut self, mut buf: Self::Start) -> (u32, Self::InProgress) {
596        if self.reader.done {
597            return (DROPPED, (buf, None));
598        }
599
600        let cap = buf.spare_capacity_mut();
601        let ptr;
602        let cleanup;
603        // If `T` requires a lifting operation, then allocate a slab of memory
604        // which will store the canonical ABI read. Otherwise we can use the
605        // raw capacity in `buf` itself.
606        if self.reader.ops.native_abi_matches_canonical_abi() {
607            ptr = cap.as_mut_ptr().cast();
608            cleanup = None;
609        } else {
610            let elem_layout = self.reader.ops.elem_layout();
611            let layout =
612                Layout::from_size_align(elem_layout.size() * cap.len(), elem_layout.align())
613                    .unwrap();
614            (ptr, cleanup) = Cleanup::new(layout);
615        }
616        // SAFETY: `ptr` is either in `buf` or in `cleanup`, both of which will
617        // persist with this async operation itself.
618        let code = unsafe {
619            self.reader
620                .ops
621                .start_read(self.reader.handle(), ptr, cap.len())
622        };
623        rtdebug!(
624            "stream.read({}, {ptr:?}, {}) = {code:#x}",
625            self.reader.handle(),
626            cap.len()
627        );
628        (code, (buf, cleanup))
629    }
630
631    fn start_cancelled(&mut self, buf: Self::Start) -> Self::Cancel {
632        (StreamResult::Cancelled, buf)
633    }
634
635    fn in_progress_update(
636        &mut self,
637        (mut buf, cleanup): Self::InProgress,
638        code: u32,
639    ) -> Result<Self::Result, Self::InProgress> {
640        match ReturnCode::decode(code) {
641            ReturnCode::Blocked => Err((buf, cleanup)),
642
643            // Note that the `cleanup`, if any, is discarded here.
644            ReturnCode::Dropped(0) => Ok((StreamResult::Dropped, buf)),
645
646            // When an in-progress read is successfully cancelled then the
647            // allocation that was being read into, if any, is just discarded.
648            //
649            // TODO: should maybe thread this around like `AbiBuffer` to cache
650            // the read allocation?
651            ReturnCode::Cancelled(0) => Ok((StreamResult::Cancelled, buf)),
652
653            code @ (ReturnCode::Completed(amt)
654            | ReturnCode::Dropped(amt)
655            | ReturnCode::Cancelled(amt)) => {
656                let amt = usize::try_from(amt).unwrap();
657                let cur_len = buf.len();
658                assert!(amt <= buf.capacity() - cur_len);
659
660                if self.reader.ops.native_abi_matches_canonical_abi() {
661                    // If no `lift` was necessary, then the results of this operation
662                    // were read directly into `buf`, so just update its length now that
663                    // values have been initialized.
664                    unsafe {
665                        buf.set_len(cur_len + amt);
666                    }
667                } else {
668                    // With a `lift` operation this now requires reading `amt` items
669                    // from `cleanup` and pushing them into `buf`.
670                    let mut ptr = cleanup
671                        .as_ref()
672                        .map(|c| c.ptr.as_ptr())
673                        .unwrap_or(ptr::null_mut());
674                    for _ in 0..amt {
675                        unsafe {
676                            buf.push(self.reader.ops.lift(ptr));
677                            ptr = ptr.add(self.reader.ops.elem_layout().size());
678                        }
679                    }
680                }
681
682                // Intentionally dispose of `cleanup` here as, if it was used, all
683                // allocations have been read from it and appended to `buf`.
684                drop(cleanup);
685                if let ReturnCode::Dropped(_) = code {
686                    self.reader.done = true;
687                }
688                Ok((StreamResult::Complete(amt), buf))
689            }
690        }
691    }
692
693    fn in_progress_waitable(&mut self, _: &Self::InProgress) -> u32 {
694        self.reader.handle()
695    }
696
697    fn in_progress_cancel(&mut self, _: &mut Self::InProgress) -> u32 {
698        // SAFETY: we're managing `reader` and all the various operational bits,
699        // so this relies on `WaitableOperation` being safe.
700        let code = unsafe { self.reader.ops.cancel_read(self.reader.handle()) };
701        rtdebug!("stream.cancel-read({}) = {code:#x}", self.reader.handle());
702        code
703    }
704
705    fn result_into_cancel(&mut self, result: Self::Result) -> Self::Cancel {
706        result
707    }
708}
709
710impl<O: StreamOps> Future for RawStreamRead<'_, O> {
711    type Output = (StreamResult, Vec<O::Payload>);
712
713    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
714        self.pin_project().poll_complete(cx)
715    }
716}
717
718impl<'a, O> RawStreamRead<'a, O>
719where
720    O: StreamOps,
721{
722    fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation<StreamReadOp<'a, O>>> {
723        // SAFETY: we've chosen that when `Self` is pinned that it translates to
724        // always pinning the inner field, so that's codified here.
725        unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().op) }
726    }
727
728    /// Cancel this read if it hasn't already completed.
729    ///
730    /// This method will initiate a cancellation operation for this active
731    /// read. This may race with the actual read itself and so this may actually
732    /// complete with some results.
733    ///
734    /// The final result of cancellation is returned, along with the original
735    /// buffer.
736    ///
737    /// # Panics
738    ///
739    /// Panics if the operation has already been completed via `Future::poll`,
740    /// or if this method is called twice.
741    pub fn cancel(self: Pin<&mut Self>) -> (StreamResult, Vec<O::Payload>) {
742        self.pin_project().cancel()
743    }
744}