Skip to main content

wit_bindgen/rt/async_support/
stream_support.rs

1//! For a high-level overview of how this module is implemented see the
2//! module documentation in `future_support.rs`.
3
4use crate::rt::async_support::waitable::{WaitableOp, WaitableOperation};
5use crate::rt::async_support::{AbiBuffer, DROPPED, ReturnCode};
6use {
7    crate::rt::Cleanup,
8    std::{
9        alloc::Layout,
10        fmt,
11        future::Future,
12        pin::Pin,
13        ptr,
14        sync::atomic::{AtomicU32, Ordering::Relaxed},
15        task::{Context, Poll},
16        vec::Vec,
17    },
18};
19
20/// Maximum size of a read/write operation as specified by the canonical ABI.
21const MAX_LENGTH: usize = (1 << 28) - 1;
22
23/// Operations that a stream requires throughout the implementation.
24///
25/// This is generated by `wit_bindgen::generate!` primarily.
26#[doc(hidden)]
27pub unsafe trait StreamOps: Clone {
28    /// The Rust type that's sent or received on this stream.
29    type Payload: 'static;
30
31    /// The `stream.new` intrinsic.
32    fn new(&mut self) -> u64;
33
34    /// The canonical ABI layout of the type that this stream is
35    /// sending/receiving.
36    fn elem_layout(&self) -> Layout;
37
38    /// Returns whether `lift` or `lower` is required to create `Self::Payload`.
39    ///
40    /// If this returns `false` then `Self::Payload` is natively in its
41    /// canonical ABI representation.
42    fn native_abi_matches_canonical_abi(&self) -> bool;
43
44    /// Returns whether `O::Payload` has lists that need to be deallocated with
45    /// `dealloc_lists`.
46    fn contains_lists(&self) -> bool;
47
48    /// Converts a Rust type to its canonical ABI representation.
49    unsafe fn lower(&mut self, payload: Self::Payload, dst: *mut u8);
50    /// Used to deallocate any Rust-owned lists in the canonical ABI
51    /// representation for when a value is successfully sent but needs to be
52    /// cleaned up.
53    unsafe fn dealloc_lists(&mut self, dst: *mut u8);
54    /// Converts from the canonical ABI representation to a Rust value.
55    unsafe fn lift(&mut self, dst: *mut u8) -> Self::Payload;
56    /// The `stream.write` intrinsic
57    unsafe fn start_write(&mut self, stream: u32, val: *const u8, amt: usize) -> u32;
58    /// The `stream.read` intrinsic
59    unsafe fn start_read(&mut self, stream: u32, val: *mut u8, amt: usize) -> u32;
60    /// The `stream.cancel-read` intrinsic
61    unsafe fn cancel_read(&mut self, stream: u32) -> u32;
62    /// The `stream.cancel-write` intrinsic
63    unsafe fn cancel_write(&mut self, stream: u32) -> u32;
64    /// The `stream.drop-readable` intrinsic
65    unsafe fn drop_readable(&mut self, stream: u32);
66    /// The `stream.drop-writable` intrinsic
67    unsafe fn drop_writable(&mut self, stream: u32);
68}
69/// Operations that a stream requires throughout the implementation.
70///
71/// This is generated by `wit_bindgen::generate!` primarily.
72#[doc(hidden)]
73pub struct StreamVtable<T> {
74    /// The in-memory canonical ABI layout of a single value of `T`.
75    pub layout: Layout,
76
77    /// An optional callback where if provided will lower an owned `T` value
78    /// into the `dst` pointer.
79    ///
80    /// If this is called the ownership of all of `T`'s lists and resources are
81    /// passed to `dst`, possibly by reallocating if `T`'s layout differs from
82    /// the canonical ABI layout.
83    ///
84    /// If this is `None` then it means that `T` has the same layout in-memory
85    /// in Rust as it does in the canonical ABI. In such a situation the
86    /// lower/lift operation can be dropped.
87    pub lower: Option<unsafe fn(value: T, dst: *mut u8)>,
88
89    /// Callback used to deallocate any owned lists in `dst` after a value has
90    /// been successfully sent along a stream.
91    ///
92    /// `None` means that `T` has no lists internally.
93    pub dealloc_lists: Option<unsafe fn(dst: *mut u8)>,
94
95    /// Dual of `lower`, and like `lower` if this is missing then it means that
96    /// `T` has the same in-memory representation in Rust and the canonical ABI.
97    pub lift: Option<unsafe fn(dst: *mut u8) -> T>,
98
99    /// The raw `stream.write` intrinsic.
100    pub start_write: unsafe extern "C" fn(stream: u32, val: *const u8, amt: usize) -> u32,
101    /// The raw `stream.read` intrinsic.
102    pub start_read: unsafe extern "C" fn(stream: u32, val: *mut u8, amt: usize) -> u32,
103    /// The raw `stream.cancel-write` intrinsic.
104    pub cancel_write: unsafe extern "C" fn(stream: u32) -> u32,
105    /// The raw `stream.cancel-read` intrinsic.
106    pub cancel_read: unsafe extern "C" fn(stream: u32) -> u32,
107    /// The raw `stream.drop-writable` intrinsic.
108    pub drop_writable: unsafe extern "C" fn(stream: u32),
109    /// The raw `stream.drop-readable` intrinsic.
110    pub drop_readable: unsafe extern "C" fn(stream: u32),
111    /// The raw `stream.new` intrinsic.
112    pub new: unsafe extern "C" fn() -> u64,
113}
114
115unsafe impl<T: 'static> StreamOps for &StreamVtable<T> {
116    type Payload = T;
117
118    fn new(&mut self) -> u64 {
119        unsafe { (self.new)() }
120    }
121    fn elem_layout(&self) -> Layout {
122        self.layout
123    }
124    fn native_abi_matches_canonical_abi(&self) -> bool {
125        self.lift.is_none()
126    }
127    fn contains_lists(&self) -> bool {
128        self.dealloc_lists.is_some()
129    }
130    unsafe fn lower(&mut self, payload: Self::Payload, dst: *mut u8) {
131        if let Some(f) = self.lower {
132            unsafe { f(payload, dst) }
133        }
134    }
135    unsafe fn dealloc_lists(&mut self, dst: *mut u8) {
136        if let Some(f) = self.dealloc_lists {
137            unsafe { f(dst) }
138        }
139    }
140    unsafe fn lift(&mut self, dst: *mut u8) -> Self::Payload {
141        unsafe { (self.lift.unwrap())(dst) }
142    }
143    unsafe fn start_write(&mut self, stream: u32, val: *const u8, amt: usize) -> u32 {
144        unsafe { (self.start_write)(stream, val, amt) }
145    }
146    unsafe fn start_read(&mut self, stream: u32, val: *mut u8, amt: usize) -> u32 {
147        unsafe { (self.start_read)(stream, val, amt) }
148    }
149    unsafe fn cancel_read(&mut self, stream: u32) -> u32 {
150        unsafe { (self.cancel_read)(stream) }
151    }
152    unsafe fn cancel_write(&mut self, stream: u32) -> u32 {
153        unsafe { (self.cancel_write)(stream) }
154    }
155    unsafe fn drop_readable(&mut self, stream: u32) {
156        unsafe { (self.drop_readable)(stream) }
157    }
158    unsafe fn drop_writable(&mut self, stream: u32) {
159        unsafe { (self.drop_writable)(stream) }
160    }
161}
162
163/// Helper function to create a new read/write pair for a component model
164/// stream.
165pub unsafe fn stream_new<T>(
166    vtable: &'static StreamVtable<T>,
167) -> (StreamWriter<T>, StreamReader<T>) {
168    unsafe { raw_stream_new(vtable) }
169}
170
171/// Helper function to create a new read/write pair for a component model
172/// stream.
173pub unsafe fn raw_stream_new<O>(mut ops: O) -> (RawStreamWriter<O>, RawStreamReader<O>)
174where
175    O: StreamOps + Clone,
176{
177    unsafe {
178        let handles = ops.new();
179        let reader = handles as u32;
180        let writer = (handles >> 32) as u32;
181        rtdebug!("stream.new() = [{writer}, {reader}]");
182        (
183            RawStreamWriter::new(writer, ops.clone()),
184            RawStreamReader::new(reader, ops),
185        )
186    }
187}
188
189/// Represents the writable end of a Component Model `stream`.
190pub type StreamWriter<T> = RawStreamWriter<&'static StreamVtable<T>>;
191
192/// Represents the writable end of a Component Model `stream`.
193pub struct RawStreamWriter<O: StreamOps> {
194    handle: u32,
195    ops: O,
196    done: bool,
197}
198
199impl<O> RawStreamWriter<O>
200where
201    O: StreamOps,
202{
203    #[doc(hidden)]
204    pub unsafe fn new(handle: u32, ops: O) -> Self {
205        Self {
206            handle,
207            ops,
208            done: false,
209        }
210    }
211
212    /// Returns the index of the component-model handle that this stream is
213    /// using.
214    pub fn handle(&self) -> u32 {
215        self.handle
216    }
217
218    /// Initiate a write of the `values` provided into this stream.
219    ///
220    /// This method is akin to an `async fn` except that the returned
221    /// [`StreamWrite`] future can also be cancelled via [`StreamWrite::cancel`]
222    /// to re-acquire undelivered values.
223    ///
224    /// This method will perform at most a single write of the `values`
225    /// provided. The returned future will resolve once the write has completed.
226    ///
227    /// # Return Values
228    ///
229    /// The returned [`StreamWrite`] future returns a tuple of `(result, buf)`.
230    /// The `result` can be `StreamResult::Complete(n)` meaning that `n` values
231    /// were sent from `values` into this writer. A result of
232    /// `StreamResult::Dropped` means that no values were sent and the other side
233    /// has hung-up and sending values will no longer be possible.
234    ///
235    /// The `buf` returned is an [`AbiBuffer<T>`] which retains ownership of the
236    /// original `values` provided here. That can be used to re-acquire `values`
237    /// through the [`AbiBuffer::into_vec`] method. The `buf` maintains an
238    /// internal cursor of how many values have been written and if the write
239    /// should be resumed to write the entire buffer then the
240    /// [`StreamWriter::write_buf`] method can be used to resume writing at the
241    /// next value in the buffer.
242    ///
243    /// # Cancellation
244    ///
245    /// The returned [`StreamWrite`] future can be cancelled like any other Rust
246    /// future via `drop`, but this means that `values` will be lost within the
247    /// future. The [`StreamWrite::cancel`] method can be used to re-acquire the
248    /// in-progress write that is being done with `values`. This is effectively
249    /// a way of forcing the future to immediately resolve.
250    ///
251    /// Note that if this future is cancelled via `drop` it does not mean that
252    /// no values were sent. It may be possible that values were still sent
253    /// despite being cancelled. Cancelling a write and determining what
254    /// happened must be done with [`StreamWrite::cancel`].
255    pub fn write(&mut self, values: Vec<O::Payload>) -> RawStreamWrite<'_, O> {
256        self.write_buf(AbiBuffer::new(values, self.ops.clone()))
257    }
258
259    /// Same as [`StreamWriter::write`], except this takes [`AbiBuffer<T>`]
260    /// instead of `Vec<T>`.
261    pub fn write_buf(&mut self, values: AbiBuffer<O>) -> RawStreamWrite<'_, O> {
262        RawStreamWrite {
263            op: WaitableOperation::new(StreamWriteOp { writer: self }, values),
264        }
265    }
266
267    /// Writes all of the `values` provided into this stream.
268    ///
269    /// This is a higher-level method than [`StreamWriter::write`] and does not
270    /// expose cancellation for example. This will successively attempt to write
271    /// all of `values` provided into this stream. Upon completion the same
272    /// vector will be returned and any remaining elements in the vector were
273    /// not sent because the stream was dropped.
274    pub async fn write_all(&mut self, values: Vec<O::Payload>) -> Vec<O::Payload> {
275        // Perform an initial write which converts `values` into `AbiBuffer`.
276        let (mut status, mut buf) = self.write(values).await;
277
278        // While the previous write completed and there's still remaining items
279        // in the buffer, perform another write.
280        while let StreamResult::Complete(_) = status {
281            if buf.remaining() == 0 {
282                break;
283            }
284            (status, buf) = self.write_buf(buf).await;
285
286            // FIXME(WebAssembly/component-model#490)
287            if status == StreamResult::Cancelled {
288                status = StreamResult::Complete(0);
289            }
290        }
291
292        // Return back any values that weren't written by shifting them to the
293        // front of the returned vector.
294        assert!(buf.remaining() == 0 || matches!(status, StreamResult::Dropped));
295        buf.into_vec()
296    }
297
298    /// Writes the singular `value` provided
299    ///
300    /// This is a higher-level method than [`StreamWriter::write`] and does not
301    /// expose cancellation for example. This will attempt to send `value` on
302    /// this stream.
303    ///
304    /// If the other end hangs up then the value is returned back as
305    /// `Some(value)`, otherwise `None` is returned indicating the value was
306    /// sent.
307    pub async fn write_one(&mut self, value: O::Payload) -> Option<O::Payload> {
308        // TODO: can probably be a bit more efficient about this and avoid
309        // moving `value` onto the heap in some situations, but that's left as
310        // an optimization for later.
311        self.write_all(std::vec![value]).await.pop()
312    }
313}
314
315impl<O> fmt::Debug for RawStreamWriter<O>
316where
317    O: StreamOps,
318{
319    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
320        f.debug_struct("StreamWriter")
321            .field("handle", &self.handle)
322            .finish()
323    }
324}
325
326impl<O> Drop for RawStreamWriter<O>
327where
328    O: StreamOps,
329{
330    fn drop(&mut self) {
331        rtdebug!("stream.drop-writable({})", self.handle);
332        unsafe {
333            self.ops.drop_writable(self.handle);
334        }
335    }
336}
337
338/// Represents a write operation which may be cancelled prior to completion.
339pub type StreamWrite<'a, T> = RawStreamWrite<'a, &'static StreamVtable<T>>;
340
341/// Represents a write operation which may be cancelled prior to completion.
342pub struct RawStreamWrite<'a, O: StreamOps> {
343    op: WaitableOperation<StreamWriteOp<'a, O>>,
344}
345
346struct StreamWriteOp<'a, O: StreamOps> {
347    writer: &'a mut RawStreamWriter<O>,
348}
349
350/// Result of a [`StreamWriter::write`] or [`StreamReader::read`] operation,
351/// yielded by the [`StreamWrite`] or [`StreamRead`] futures.
352#[derive(Copy, Clone, PartialEq, Eq, Debug)]
353pub enum StreamResult {
354    /// The provided number of values were successfully transferred.
355    ///
356    /// For writes this is how many items were written, and for reads this is
357    /// how many items were read.
358    Complete(usize),
359    /// No values were written, the other end has dropped its handle.
360    Dropped,
361    /// No values were written, the operation was cancelled.
362    Cancelled,
363}
364
365unsafe impl<'a, O> WaitableOp for StreamWriteOp<'a, O>
366where
367    O: StreamOps,
368{
369    type Start = AbiBuffer<O>;
370    type InProgress = AbiBuffer<O>;
371    type Result = (StreamResult, AbiBuffer<O>);
372    type Cancel = (StreamResult, AbiBuffer<O>);
373
374    fn start(&mut self, buf: Self::Start) -> (u32, Self::InProgress) {
375        if self.writer.done {
376            return (DROPPED, buf);
377        }
378
379        let (ptr, len) = buf.abi_ptr_and_len();
380        // SAFETY: sure hope this is safe, everything in this module and
381        // `AbiBuffer` is trying to make this safe.
382        let code = unsafe {
383            self.writer
384                .ops
385                .start_write(self.writer.handle, ptr, len.min(MAX_LENGTH))
386        };
387        rtdebug!(
388            "stream.write({}, {ptr:?}, {len}) = {code:#x}",
389            self.writer.handle
390        );
391        (code, buf)
392    }
393
394    fn start_cancelled(&mut self, buf: Self::Start) -> Self::Cancel {
395        (StreamResult::Cancelled, buf)
396    }
397
398    fn in_progress_update(
399        &mut self,
400        mut buf: Self::InProgress,
401        code: u32,
402    ) -> Result<Self::Result, Self::InProgress> {
403        match ReturnCode::decode(code) {
404            ReturnCode::Blocked => Err(buf),
405            ReturnCode::Dropped(0) => Ok((StreamResult::Dropped, buf)),
406            ReturnCode::Cancelled(0) => Ok((StreamResult::Cancelled, buf)),
407            code @ (ReturnCode::Completed(amt)
408            | ReturnCode::Dropped(amt)
409            | ReturnCode::Cancelled(amt)) => {
410                let amt = amt.try_into().unwrap();
411                buf.advance(amt);
412                if let ReturnCode::Dropped(_) = code {
413                    self.writer.done = true;
414                }
415                Ok((StreamResult::Complete(amt), buf))
416            }
417        }
418    }
419
420    fn in_progress_waitable(&mut self, _: &Self::InProgress) -> u32 {
421        self.writer.handle
422    }
423
424    fn in_progress_cancel(&mut self, _: &mut Self::InProgress) -> u32 {
425        // SAFETY: we're managing `writer` and all the various operational bits,
426        // so this relies on `WaitableOperation` being safe.
427        let code = unsafe { self.writer.ops.cancel_write(self.writer.handle) };
428        rtdebug!("stream.cancel-write({}) = {code:#x}", self.writer.handle);
429        code
430    }
431
432    fn result_into_cancel(&mut self, result: Self::Result) -> Self::Cancel {
433        result
434    }
435}
436
437impl<O: StreamOps> Future for RawStreamWrite<'_, O> {
438    type Output = (StreamResult, AbiBuffer<O>);
439
440    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
441        self.pin_project().poll_complete(cx)
442    }
443}
444
445impl<'a, O: StreamOps> RawStreamWrite<'a, O> {
446    fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation<StreamWriteOp<'a, O>>> {
447        // SAFETY: we've chosen that when `Self` is pinned that it translates to
448        // always pinning the inner field, so that's codified here.
449        unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().op) }
450    }
451
452    /// Cancel this write if it hasn't already completed.
453    ///
454    /// This method can be used to cancel a write-in-progress and re-acquire
455    /// values being sent. Note that the result here may still indicate that
456    /// some values were written if the race to cancel the write was lost.
457    ///
458    /// # Panics
459    ///
460    /// Panics if the operation has already been completed via `Future::poll`,
461    /// or if this method is called twice.
462    pub fn cancel(self: Pin<&mut Self>) -> (StreamResult, AbiBuffer<O>) {
463        self.pin_project().cancel()
464    }
465}
466
467/// Represents the readable end of a Component Model `stream`.
468pub type StreamReader<T> = RawStreamReader<&'static StreamVtable<T>>;
469
470/// Represents the readable end of a Component Model `stream`.
471pub struct RawStreamReader<O: StreamOps> {
472    handle: AtomicU32,
473    ops: O,
474    done: bool,
475}
476
477impl<O: StreamOps> fmt::Debug for RawStreamReader<O> {
478    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
479        f.debug_struct("StreamReader")
480            .field("handle", &self.handle)
481            .finish()
482    }
483}
484
485impl<O: StreamOps> RawStreamReader<O> {
486    #[doc(hidden)]
487    pub fn new(handle: u32, ops: O) -> Self {
488        Self {
489            handle: AtomicU32::new(handle),
490            ops,
491            done: false,
492        }
493    }
494
495    #[doc(hidden)]
496    pub fn take_handle(&self) -> u32 {
497        let ret = self.opt_handle().unwrap();
498        self.handle.store(u32::MAX, Relaxed);
499        ret
500    }
501
502    /// Returns the index of the component-model handle that this stream is
503    /// using.
504    pub fn handle(&self) -> u32 {
505        self.opt_handle().unwrap()
506    }
507
508    fn opt_handle(&self) -> Option<u32> {
509        match self.handle.load(Relaxed) {
510            u32::MAX => None,
511            other => Some(other),
512        }
513    }
514
515    /// Starts a new read operation on this stream into `buf`.
516    ///
517    /// This method will read values into the spare capacity of the `buf`
518    /// provided. If `buf` has no spare capacity then this will be equivalent
519    /// to a zero-length read.
520    ///
521    /// Upon completion the `buf` will be yielded back to the caller via the
522    /// completion of the [`StreamRead`] future.
523    ///
524    /// # Cancellation
525    ///
526    /// Cancelling the returned future can be done with `drop` like all Rust
527    /// futures, but it does not mean that no values were read. To accurately
528    /// determine if values were read the [`StreamRead::cancel`] method must be
529    /// used.
530    pub fn read(&mut self, buf: Vec<O::Payload>) -> RawStreamRead<'_, O> {
531        RawStreamRead {
532            op: WaitableOperation::new(StreamReadOp { reader: self }, buf),
533        }
534    }
535
536    /// Reads a single item from this stream.
537    ///
538    /// This is a higher-level method than [`StreamReader::read`] in that it
539    /// reads only a single item and does not expose control over cancellation.
540    pub async fn next(&mut self) -> Option<O::Payload> {
541        // TODO: should amortize this allocation and avoid doing it every time.
542        // Or somehow perhaps make this more optimal.
543        let (_result, mut buf) = self.read(Vec::with_capacity(1)).await;
544        buf.pop()
545    }
546
547    /// Reads all items from this stream and returns the list.
548    ///
549    /// This method will read all remaining items from this stream into a list
550    /// and await the stream to be dropped.
551    pub async fn collect(mut self) -> Vec<O::Payload> {
552        let mut ret = Vec::new();
553        loop {
554            // If there's no more spare capacity then reserve room for one item
555            // which should trigger `Vec`'s built-in resizing logic, which will
556            // free up likely more capacity than just one slot.
557            if ret.len() == ret.capacity() {
558                ret.reserve(1);
559            }
560            let (status, buf) = self.read(ret).await;
561            ret = buf;
562            match status {
563                StreamResult::Complete(_) => {}
564                StreamResult::Dropped => break,
565                StreamResult::Cancelled => unreachable!(),
566            }
567        }
568        ret
569    }
570}
571
572impl<O: StreamOps> Drop for RawStreamReader<O> {
573    fn drop(&mut self) {
574        let Some(handle) = self.opt_handle() else {
575            return;
576        };
577        unsafe {
578            rtdebug!("stream.drop-readable({})", handle);
579            self.ops.drop_readable(handle);
580        }
581    }
582}
583
584/// Represents a read operation which may be cancelled prior to completion.
585pub type StreamRead<'a, T> = RawStreamRead<'a, &'static StreamVtable<T>>;
586
587/// Represents a read operation which may be cancelled prior to completion.
588pub struct RawStreamRead<'a, O: StreamOps> {
589    op: WaitableOperation<StreamReadOp<'a, O>>,
590}
591
592struct StreamReadOp<'a, O: StreamOps> {
593    reader: &'a mut RawStreamReader<O>,
594}
595
596unsafe impl<'a, O: StreamOps> WaitableOp for StreamReadOp<'a, O> {
597    type Start = Vec<O::Payload>;
598    type InProgress = (Vec<O::Payload>, Option<Cleanup>);
599    type Result = (StreamResult, Vec<O::Payload>);
600    type Cancel = (StreamResult, Vec<O::Payload>);
601
602    fn start(&mut self, mut buf: Self::Start) -> (u32, Self::InProgress) {
603        if self.reader.done {
604            return (DROPPED, (buf, None));
605        }
606
607        let cap = buf.spare_capacity_mut();
608        let ptr;
609        let cleanup;
610        // If `T` requires a lifting operation, then allocate a slab of memory
611        // which will store the canonical ABI read. Otherwise we can use the
612        // raw capacity in `buf` itself.
613        if self.reader.ops.native_abi_matches_canonical_abi() {
614            ptr = cap.as_mut_ptr().cast();
615            cleanup = None;
616        } else {
617            let elem_layout = self.reader.ops.elem_layout();
618            let layout =
619                Layout::from_size_align(elem_layout.size() * cap.len(), elem_layout.align())
620                    .unwrap();
621            (ptr, cleanup) = Cleanup::new(layout);
622        }
623        // SAFETY: `ptr` is either in `buf` or in `cleanup`, both of which will
624        // persist with this async operation itself.
625        let code = unsafe {
626            self.reader
627                .ops
628                .start_read(self.reader.handle(), ptr, cap.len().min(MAX_LENGTH))
629        };
630        rtdebug!(
631            "stream.read({}, {ptr:?}, {}) = {code:#x}",
632            self.reader.handle(),
633            cap.len()
634        );
635        (code, (buf, cleanup))
636    }
637
638    fn start_cancelled(&mut self, buf: Self::Start) -> Self::Cancel {
639        (StreamResult::Cancelled, buf)
640    }
641
642    fn in_progress_update(
643        &mut self,
644        (mut buf, cleanup): Self::InProgress,
645        code: u32,
646    ) -> Result<Self::Result, Self::InProgress> {
647        match ReturnCode::decode(code) {
648            ReturnCode::Blocked => Err((buf, cleanup)),
649
650            // Note that the `cleanup`, if any, is discarded here.
651            ReturnCode::Dropped(0) => Ok((StreamResult::Dropped, buf)),
652
653            // When an in-progress read is successfully cancelled then the
654            // allocation that was being read into, if any, is just discarded.
655            //
656            // TODO: should maybe thread this around like `AbiBuffer` to cache
657            // the read allocation?
658            ReturnCode::Cancelled(0) => Ok((StreamResult::Cancelled, buf)),
659
660            code @ (ReturnCode::Completed(amt)
661            | ReturnCode::Dropped(amt)
662            | ReturnCode::Cancelled(amt)) => {
663                let amt = usize::try_from(amt).unwrap();
664                let cur_len = buf.len();
665                assert!(amt <= buf.capacity() - cur_len);
666
667                if self.reader.ops.native_abi_matches_canonical_abi() {
668                    // If no `lift` was necessary, then the results of this operation
669                    // were read directly into `buf`, so just update its length now that
670                    // values have been initialized.
671                    unsafe {
672                        buf.set_len(cur_len + amt);
673                    }
674                } else {
675                    // With a `lift` operation this now requires reading `amt` items
676                    // from `cleanup` and pushing them into `buf`.
677                    let mut ptr = cleanup
678                        .as_ref()
679                        .map(|c| c.ptr.as_ptr())
680                        .unwrap_or(ptr::null_mut());
681                    for _ in 0..amt {
682                        unsafe {
683                            buf.push(self.reader.ops.lift(ptr));
684                            ptr = ptr.add(self.reader.ops.elem_layout().size());
685                        }
686                    }
687                }
688
689                // Intentionally dispose of `cleanup` here as, if it was used, all
690                // allocations have been read from it and appended to `buf`.
691                drop(cleanup);
692                if let ReturnCode::Dropped(_) = code {
693                    self.reader.done = true;
694                }
695                Ok((StreamResult::Complete(amt), buf))
696            }
697        }
698    }
699
700    fn in_progress_waitable(&mut self, _: &Self::InProgress) -> u32 {
701        self.reader.handle()
702    }
703
704    fn in_progress_cancel(&mut self, _: &mut Self::InProgress) -> u32 {
705        // SAFETY: we're managing `reader` and all the various operational bits,
706        // so this relies on `WaitableOperation` being safe.
707        let code = unsafe { self.reader.ops.cancel_read(self.reader.handle()) };
708        rtdebug!("stream.cancel-read({}) = {code:#x}", self.reader.handle());
709        code
710    }
711
712    fn result_into_cancel(&mut self, result: Self::Result) -> Self::Cancel {
713        result
714    }
715}
716
717impl<O: StreamOps> Future for RawStreamRead<'_, O> {
718    type Output = (StreamResult, Vec<O::Payload>);
719
720    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
721        self.pin_project().poll_complete(cx)
722    }
723}
724
725impl<'a, O> RawStreamRead<'a, O>
726where
727    O: StreamOps,
728{
729    fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation<StreamReadOp<'a, O>>> {
730        // SAFETY: we've chosen that when `Self` is pinned that it translates to
731        // always pinning the inner field, so that's codified here.
732        unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().op) }
733    }
734
735    /// Cancel this read if it hasn't already completed.
736    ///
737    /// This method will initiate a cancellation operation for this active
738    /// read. This may race with the actual read itself and so this may actually
739    /// complete with some results.
740    ///
741    /// The final result of cancellation is returned, along with the original
742    /// buffer.
743    ///
744    /// # Panics
745    ///
746    /// Panics if the operation has already been completed via `Future::poll`,
747    /// or if this method is called twice.
748    pub fn cancel(self: Pin<&mut Self>) -> (StreamResult, Vec<O::Payload>) {
749        self.pin_project().cancel()
750    }
751}