wit_bindgen/rt/async_support/
stream_support.rs

1//! For a high-level overview of how this module is implemented see the
2//! module documentation in `future_support.rs`.
3
4use crate::rt::async_support::waitable::{WaitableOp, WaitableOperation};
5use crate::rt::async_support::{AbiBuffer, ReturnCode, DROPPED};
6use {
7    crate::rt::Cleanup,
8    std::{
9        alloc::Layout,
10        fmt,
11        future::Future,
12        marker,
13        pin::Pin,
14        ptr,
15        sync::atomic::{AtomicU32, Ordering::Relaxed},
16        task::{Context, Poll},
17        vec::Vec,
18    },
19};
20
21/// Operations that a stream requires throughout the implementation.
22///
23/// This is generated by `wit_bindgen::generate!` primarily.
24#[doc(hidden)]
25pub struct StreamVtable<T> {
26    /// The in-memory canonical ABI layout of a single value of `T`.
27    pub layout: Layout,
28
29    /// An optional callback where if provided will lower an owned `T` value
30    /// into the `dst` pointer.
31    ///
32    /// If this is called the ownership of all of `T`'s lists and resources are
33    /// passed to `dst`, possibly by reallocating if `T`'s layout differs from
34    /// the canonical ABI layout.
35    ///
36    /// If this is `None` then it means that `T` has the same layout in-memory
37    /// in Rust as it does in the canonical ABI. In such a situation the
38    /// lower/lift operation can be dropped.
39    pub lower: Option<unsafe fn(value: T, dst: *mut u8)>,
40
41    /// Callback used to deallocate any owned lists in `dst` after a value has
42    /// been successfully sent along a stream.
43    ///
44    /// `None` means that `T` has no lists internally.
45    pub dealloc_lists: Option<unsafe fn(dst: *mut u8)>,
46
47    /// Dual of `lower`, and like `lower` if this is missing then it means that
48    /// `T` has the same in-memory representation in Rust and the canonical ABI.
49    pub lift: Option<unsafe fn(dst: *mut u8) -> T>,
50
51    /// The raw `stream.write` intrinsic.
52    pub start_write: unsafe extern "C" fn(stream: u32, val: *const u8, amt: usize) -> u32,
53    /// The raw `stream.read` intrinsic.
54    pub start_read: unsafe extern "C" fn(stream: u32, val: *mut u8, amt: usize) -> u32,
55    /// The raw `stream.cancel-write` intrinsic.
56    pub cancel_write: unsafe extern "C" fn(stream: u32) -> u32,
57    /// The raw `stream.cancel-read` intrinsic.
58    pub cancel_read: unsafe extern "C" fn(stream: u32) -> u32,
59    /// The raw `stream.drop-writable` intrinsic.
60    pub drop_writable: unsafe extern "C" fn(stream: u32),
61    /// The raw `stream.drop-readable` intrinsic.
62    pub drop_readable: unsafe extern "C" fn(stream: u32),
63    /// The raw `stream.new` intrinsic.
64    pub new: unsafe extern "C" fn() -> u64,
65}
66
67/// Helper function to create a new read/write pair for a component model
68/// stream.
69pub unsafe fn stream_new<T>(
70    vtable: &'static StreamVtable<T>,
71) -> (StreamWriter<T>, StreamReader<T>) {
72    unsafe {
73        let handles = (vtable.new)();
74        let reader = handles as u32;
75        let writer = (handles >> 32) as u32;
76        rtdebug!("stream.new() = [{writer}, {reader}]");
77        (
78            StreamWriter::new(writer, vtable),
79            StreamReader::new(reader, vtable),
80        )
81    }
82}
83
84/// Represents the writable end of a Component Model `stream`.
85pub struct StreamWriter<T: 'static> {
86    handle: u32,
87    vtable: &'static StreamVtable<T>,
88    done: bool,
89}
90
91impl<T> StreamWriter<T> {
92    #[doc(hidden)]
93    pub unsafe fn new(handle: u32, vtable: &'static StreamVtable<T>) -> Self {
94        Self {
95            handle,
96            vtable,
97            done: false,
98        }
99    }
100
101    /// Initiate a write of the `values` provided into this stream.
102    ///
103    /// This method is akin to an `async fn` except that the returned
104    /// [`StreamWrite`] future can also be cancelled via [`StreamWrite::cancel`]
105    /// to re-acquire undelivered values.
106    ///
107    /// This method will perform at most a single write of the `values`
108    /// provided. The returned future will resolve once the write has completed.
109    ///
110    /// # Return Values
111    ///
112    /// The returned [`StreamWrite`] future returns a tuple of `(result, buf)`.
113    /// The `result` can be `StreamResult::Complete(n)` meaning that `n` values
114    /// were sent from `values` into this writer. A result of
115    /// `StreamResult::Dropped` means that no values were sent and the other side
116    /// has hung-up and sending values will no longer be possible.
117    ///
118    /// The `buf` returned is an [`AbiBuffer<T>`] which retains ownership of the
119    /// original `values` provided here. That can be used to re-acquire `values`
120    /// through the [`AbiBuffer::into_vec`] method. The `buf` maintains an
121    /// internal cursor of how many values have been written and if the write
122    /// should be resumed to write the entire buffer then the
123    /// [`StreamWriter::write_buf`] method can be used to resume writing at the
124    /// next value in the buffer.
125    ///
126    /// # Cancellation
127    ///
128    /// The returned [`StreamWrite`] future can be cancelled like any other Rust
129    /// future via `drop`, but this means that `values` will be lost within the
130    /// future. The [`StreamWrite::cancel`] method can be used to re-acquire the
131    /// in-progress write that is being done with `values`. This is effectively
132    /// a way of forcing the future to immediately resolve.
133    ///
134    /// Note that if this future is cancelled via `drop` it does not mean that
135    /// no values were sent. It may be possible that values were still sent
136    /// despite being cancelled. Cancelling a write and determining what
137    /// happened must be done with [`StreamWrite::cancel`].
138    pub fn write(&mut self, values: Vec<T>) -> StreamWrite<'_, T> {
139        self.write_buf(AbiBuffer::new(values, self.vtable))
140    }
141
142    /// Same as [`StreamWriter::write`], except this takes [`AbiBuffer<T>`]
143    /// instead of `Vec<T>`.
144    pub fn write_buf(&mut self, values: AbiBuffer<T>) -> StreamWrite<'_, T> {
145        StreamWrite {
146            op: WaitableOperation::new((self, values)),
147        }
148    }
149
150    /// Writes all of the `values` provided into this stream.
151    ///
152    /// This is a higher-level method than [`StreamWriter::write`] and does not
153    /// expose cancellation for example. This will successively attempt to write
154    /// all of `values` provided into this stream. Upon completion the same
155    /// vector will be returned and any remaining elements in the vector were
156    /// not sent because the stream was dropped.
157    pub async fn write_all(&mut self, values: Vec<T>) -> Vec<T> {
158        // Perform an initial write which converts `values` into `AbiBuffer`.
159        let (mut status, mut buf) = self.write(values).await;
160
161        // While the previous write completed and there's still remaining items
162        // in the buffer, perform another write.
163        while let StreamResult::Complete(_) = status {
164            if buf.remaining() == 0 {
165                break;
166            }
167            (status, buf) = self.write_buf(buf).await;
168
169            // FIXME(WebAssembly/component-model#490)
170            if status == StreamResult::Cancelled {
171                status = StreamResult::Complete(0);
172            }
173        }
174
175        // Return back any values that weren't written by shifting them to the
176        // front of the returned vector.
177        assert!(buf.remaining() == 0 || matches!(status, StreamResult::Dropped));
178        buf.into_vec()
179    }
180
181    /// Writes the singular `value` provided
182    ///
183    /// This is a higher-level method than [`StreamWriter::write`] and does not
184    /// expose cancellation for example. This will attempt to send `value` on
185    /// this stream.
186    ///
187    /// If the other end hangs up then the value is returned back as
188    /// `Some(value)`, otherwise `None` is returned indicating the value was
189    /// sent.
190    pub async fn write_one(&mut self, value: T) -> Option<T> {
191        // TODO: can probably be a bit more efficient about this and avoid
192        // moving `value` onto the heap in some situations, but that's left as
193        // an optimization for later.
194        self.write_all(std::vec![value]).await.pop()
195    }
196}
197
198impl<T> fmt::Debug for StreamWriter<T> {
199    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
200        f.debug_struct("StreamWriter")
201            .field("handle", &self.handle)
202            .finish()
203    }
204}
205
206impl<T> Drop for StreamWriter<T> {
207    fn drop(&mut self) {
208        rtdebug!("stream.drop-writable({})", self.handle);
209        unsafe {
210            (self.vtable.drop_writable)(self.handle);
211        }
212    }
213}
214
215/// Represents a write operation which may be cancelled prior to completion.
216pub struct StreamWrite<'a, T: 'static> {
217    op: WaitableOperation<StreamWriteOp<'a, T>>,
218}
219
220struct StreamWriteOp<'a, T: 'static>(marker::PhantomData<(&'a mut StreamWriter<T>, T)>);
221
222/// Result of a [`StreamWriter::write`] or [`StreamReader::read`] operation,
223/// yielded by the [`StreamWrite`] or [`StreamRead`] futures.
224#[derive(Copy, Clone, PartialEq, Eq, Debug)]
225pub enum StreamResult {
226    /// The provided number of values were successfully transferred.
227    ///
228    /// For writes this is how many items were written, and for reads this is
229    /// how many items were read.
230    Complete(usize),
231    /// No values were written, the other end has dropped its handle.
232    Dropped,
233    /// No values were written, the operation was cancelled.
234    Cancelled,
235}
236
237unsafe impl<'a, T> WaitableOp for StreamWriteOp<'a, T>
238where
239    T: 'static,
240{
241    type Start = (&'a mut StreamWriter<T>, AbiBuffer<T>);
242    type InProgress = (&'a mut StreamWriter<T>, AbiBuffer<T>);
243    type Result = (StreamResult, AbiBuffer<T>);
244    type Cancel = (StreamResult, AbiBuffer<T>);
245
246    fn start((writer, buf): Self::Start) -> (u32, Self::InProgress) {
247        if writer.done {
248            return (DROPPED, (writer, buf));
249        }
250
251        let (ptr, len) = buf.abi_ptr_and_len();
252        // SAFETY: sure hope this is safe, everything in this module and
253        // `AbiBuffer` is trying to make this safe.
254        let code = unsafe { (writer.vtable.start_write)(writer.handle, ptr, len) };
255        rtdebug!(
256            "stream.write({}, {ptr:?}, {len}) = {code:#x}",
257            writer.handle
258        );
259        (code, (writer, buf))
260    }
261
262    fn start_cancelled((_writer, buf): Self::Start) -> Self::Cancel {
263        (StreamResult::Cancelled, buf)
264    }
265
266    fn in_progress_update(
267        (writer, mut buf): Self::InProgress,
268        code: u32,
269    ) -> Result<Self::Result, Self::InProgress> {
270        match ReturnCode::decode(code) {
271            ReturnCode::Blocked => Err((writer, buf)),
272            ReturnCode::Dropped(0) => Ok((StreamResult::Dropped, buf)),
273            ReturnCode::Cancelled(0) => Ok((StreamResult::Cancelled, buf)),
274            code @ (ReturnCode::Completed(amt)
275            | ReturnCode::Dropped(amt)
276            | ReturnCode::Cancelled(amt)) => {
277                let amt = amt.try_into().unwrap();
278                buf.advance(amt);
279                if let ReturnCode::Dropped(_) = code {
280                    writer.done = true;
281                }
282                Ok((StreamResult::Complete(amt), buf))
283            }
284        }
285    }
286
287    fn in_progress_waitable((writer, _): &Self::InProgress) -> u32 {
288        writer.handle
289    }
290
291    fn in_progress_cancel((writer, _): &Self::InProgress) -> u32 {
292        // SAFETY: we're managing `writer` and all the various operational bits,
293        // so this relies on `WaitableOperation` being safe.
294        let code = unsafe { (writer.vtable.cancel_write)(writer.handle) };
295        rtdebug!("stream.cancel-write({}) = {code:#x}", writer.handle);
296        code
297    }
298
299    fn result_into_cancel(result: Self::Result) -> Self::Cancel {
300        result
301    }
302}
303
304impl<T: 'static> Future for StreamWrite<'_, T> {
305    type Output = (StreamResult, AbiBuffer<T>);
306
307    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
308        self.pin_project().poll_complete(cx)
309    }
310}
311
312impl<'a, T: 'static> StreamWrite<'a, T> {
313    fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation<StreamWriteOp<'a, T>>> {
314        // SAFETY: we've chosen that when `Self` is pinned that it translates to
315        // always pinning the inner field, so that's codified here.
316        unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().op) }
317    }
318
319    /// Cancel this write if it hasn't already completed.
320    ///
321    /// This method can be used to cancel a write-in-progress and re-acquire
322    /// values being sent. Note that the result here may still indicate that
323    /// some values were written if the race to cancel the write was lost.
324    ///
325    /// # Panics
326    ///
327    /// Panics if the operation has already been completed via `Future::poll`,
328    /// or if this method is called twice.
329    pub fn cancel(self: Pin<&mut Self>) -> (StreamResult, AbiBuffer<T>) {
330        self.pin_project().cancel()
331    }
332}
333
334/// Represents the readable end of a Component Model `stream`.
335pub struct StreamReader<T: 'static> {
336    handle: AtomicU32,
337    vtable: &'static StreamVtable<T>,
338    done: bool,
339}
340
341impl<T> fmt::Debug for StreamReader<T> {
342    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
343        f.debug_struct("StreamReader")
344            .field("handle", &self.handle)
345            .finish()
346    }
347}
348
349impl<T> StreamReader<T> {
350    #[doc(hidden)]
351    pub fn new(handle: u32, vtable: &'static StreamVtable<T>) -> Self {
352        Self {
353            handle: AtomicU32::new(handle),
354            vtable,
355            done: false,
356        }
357    }
358
359    #[doc(hidden)]
360    pub fn take_handle(&self) -> u32 {
361        let ret = self.opt_handle().unwrap();
362        self.handle.store(u32::MAX, Relaxed);
363        ret
364    }
365
366    fn handle(&self) -> u32 {
367        self.opt_handle().unwrap()
368    }
369
370    fn opt_handle(&self) -> Option<u32> {
371        match self.handle.load(Relaxed) {
372            u32::MAX => None,
373            other => Some(other),
374        }
375    }
376
377    /// Starts a new read operation on this stream into `buf`.
378    ///
379    /// This method will read values into the spare capacity of the `buf`
380    /// provided. If `buf` has no spare capacity then this will be equivalent
381    /// to a zero-length read.
382    ///
383    /// Upon completion the `buf` will be yielded back to the caller via the
384    /// completion of the [`StreamRead`] future.
385    ///
386    /// # Cancellation
387    ///
388    /// Cancelling the returned future can be done with `drop` like all Rust
389    /// futures, but it does not mean that no values were read. To accurately
390    /// determine if values were read the [`StreamRead::cancel`] method must be
391    /// used.
392    pub fn read(&mut self, buf: Vec<T>) -> StreamRead<'_, T> {
393        StreamRead {
394            op: WaitableOperation::new((self, buf)),
395        }
396    }
397
398    /// Reads a single item from this stream.
399    ///
400    /// This is a higher-level method than [`StreamReader::read`] in that it
401    /// reads only a single item and does not expose control over cancellation.
402    pub async fn next(&mut self) -> Option<T> {
403        // TODO: should amortize this allocation and avoid doing it every time.
404        // Or somehow perhaps make this more optimal.
405        let (_result, mut buf) = self.read(Vec::with_capacity(1)).await;
406        buf.pop()
407    }
408
409    /// Reads all items from this stream and returns the list.
410    ///
411    /// This method will read all remaining items from this stream into a list
412    /// and await the stream to be dropped.
413    pub async fn collect(mut self) -> Vec<T> {
414        let mut ret = Vec::new();
415        loop {
416            // If there's no more spare capacity then reserve room for one item
417            // which should trigger `Vec`'s built-in resizing logic, which will
418            // free up likely more capacity than just one slot.
419            if ret.len() == ret.capacity() {
420                ret.reserve(1);
421            }
422            let (status, buf) = self.read(ret).await;
423            ret = buf;
424            match status {
425                StreamResult::Complete(_) => {}
426                StreamResult::Dropped => break,
427                StreamResult::Cancelled => unreachable!(),
428            }
429        }
430        ret
431    }
432}
433
434impl<T> Drop for StreamReader<T> {
435    fn drop(&mut self) {
436        let Some(handle) = self.opt_handle() else {
437            return;
438        };
439        unsafe {
440            rtdebug!("stream.drop-readable({})", handle);
441            (self.vtable.drop_readable)(handle);
442        }
443    }
444}
445
446/// Represents a read operation which may be cancelled prior to completion.
447pub struct StreamRead<'a, T: 'static> {
448    op: WaitableOperation<StreamReadOp<'a, T>>,
449}
450
451struct StreamReadOp<'a, T: 'static>(marker::PhantomData<(&'a mut StreamReader<T>, T)>);
452
453unsafe impl<'a, T> WaitableOp for StreamReadOp<'a, T>
454where
455    T: 'static,
456{
457    type Start = (&'a mut StreamReader<T>, Vec<T>);
458    type InProgress = (&'a mut StreamReader<T>, Vec<T>, Option<Cleanup>);
459    type Result = (StreamResult, Vec<T>);
460    type Cancel = (StreamResult, Vec<T>);
461
462    fn start((reader, mut buf): Self::Start) -> (u32, Self::InProgress) {
463        if reader.done {
464            return (DROPPED, (reader, buf, None));
465        }
466
467        let cap = buf.spare_capacity_mut();
468        let ptr;
469        let cleanup;
470        // If `T` requires a lifting operation, then allocate a slab of memory
471        // which will store the canonical ABI read. Otherwise we can use the
472        // raw capacity in `buf` itself.
473        if reader.vtable.lift.is_some() {
474            let layout = Layout::from_size_align(
475                reader.vtable.layout.size() * cap.len(),
476                reader.vtable.layout.align(),
477            )
478            .unwrap();
479            (ptr, cleanup) = Cleanup::new(layout);
480        } else {
481            ptr = cap.as_mut_ptr().cast();
482            cleanup = None;
483        }
484        // SAFETY: `ptr` is either in `buf` or in `cleanup`, both of which will
485        // persist with this async operation itself.
486        let code = unsafe { (reader.vtable.start_read)(reader.handle(), ptr, cap.len()) };
487        rtdebug!(
488            "stream.read({}, {ptr:?}, {}) = {code:#x}",
489            reader.handle(),
490            cap.len()
491        );
492        (code, (reader, buf, cleanup))
493    }
494
495    fn start_cancelled((_, buf): Self::Start) -> Self::Cancel {
496        (StreamResult::Cancelled, buf)
497    }
498
499    fn in_progress_update(
500        (reader, mut buf, cleanup): Self::InProgress,
501        code: u32,
502    ) -> Result<Self::Result, Self::InProgress> {
503        match ReturnCode::decode(code) {
504            ReturnCode::Blocked => Err((reader, buf, cleanup)),
505
506            // Note that the `cleanup`, if any, is discarded here.
507            ReturnCode::Dropped(0) => Ok((StreamResult::Dropped, buf)),
508
509            // When an in-progress read is successfully cancelled then the
510            // allocation that was being read into, if any, is just discarded.
511            //
512            // TODO: should maybe thread this around like `AbiBuffer` to cache
513            // the read allocation?
514            ReturnCode::Cancelled(0) => Ok((StreamResult::Cancelled, buf)),
515
516            code @ (ReturnCode::Completed(amt)
517            | ReturnCode::Dropped(amt)
518            | ReturnCode::Cancelled(amt)) => {
519                let amt = usize::try_from(amt).unwrap();
520                let cur_len = buf.len();
521                assert!(amt <= buf.capacity() - cur_len);
522
523                match reader.vtable.lift {
524                    // With a `lift` operation this now requires reading `amt` items
525                    // from `cleanup` and pushing them into `buf`.
526                    Some(lift) => {
527                        let mut ptr = cleanup
528                            .as_ref()
529                            .map(|c| c.ptr.as_ptr())
530                            .unwrap_or(ptr::null_mut());
531                        for _ in 0..amt {
532                            unsafe {
533                                buf.push(lift(ptr));
534                                ptr = ptr.add(reader.vtable.layout.size());
535                            }
536                        }
537                    }
538
539                    // If no `lift` was necessary, then the results of this operation
540                    // were read directly into `buf`, so just update its length now that
541                    // values have been initialized.
542                    None => unsafe { buf.set_len(cur_len + amt) },
543                }
544
545                // Intentionally dispose of `cleanup` here as, if it was used, all
546                // allocations have been read from it and appended to `buf`.
547                drop(cleanup);
548                if let ReturnCode::Dropped(_) = code {
549                    reader.done = true;
550                }
551                Ok((StreamResult::Complete(amt), buf))
552            }
553        }
554    }
555
556    fn in_progress_waitable((reader, ..): &Self::InProgress) -> u32 {
557        reader.handle()
558    }
559
560    fn in_progress_cancel((reader, ..): &Self::InProgress) -> u32 {
561        // SAFETY: we're managing `reader` and all the various operational bits,
562        // so this relies on `WaitableOperation` being safe.
563        let code = unsafe { (reader.vtable.cancel_read)(reader.handle()) };
564        rtdebug!("stream.cancel-read({}) = {code:#x}", reader.handle());
565        code
566    }
567
568    fn result_into_cancel(result: Self::Result) -> Self::Cancel {
569        result
570    }
571}
572
573impl<T: 'static> Future for StreamRead<'_, T> {
574    type Output = (StreamResult, Vec<T>);
575
576    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
577        self.pin_project().poll_complete(cx)
578    }
579}
580
581impl<'a, T> StreamRead<'a, T> {
582    fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation<StreamReadOp<'a, T>>> {
583        // SAFETY: we've chosen that when `Self` is pinned that it translates to
584        // always pinning the inner field, so that's codified here.
585        unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().op) }
586    }
587
588    /// Cancel this read if it hasn't already completed.
589    ///
590    /// This method will initiate a cancellation operation for this active
591    /// read. This may race with the actual read itself and so this may actually
592    /// complete with some results.
593    ///
594    /// The final result of cancellation is returned, along with the original
595    /// buffer.
596    ///
597    /// # Panics
598    ///
599    /// Panics if the operation has already been completed via `Future::poll`,
600    /// or if this method is called twice.
601    pub fn cancel(self: Pin<&mut Self>) -> (StreamResult, Vec<T>) {
602        self.pin_project().cancel()
603    }
604}