wit_bindgen/rt/async_support/
stream_support.rs

1//! For a high-level overview of how this module is implemented see the
2//! module documentation in `future_support.rs`.
3
4use crate::rt::async_support::waitable::{WaitableOp, WaitableOperation};
5use crate::rt::async_support::{AbiBuffer, ReturnCode, DROPPED};
6use {
7    crate::rt::Cleanup,
8    std::{
9        alloc::Layout,
10        fmt,
11        future::Future,
12        marker,
13        pin::Pin,
14        ptr,
15        sync::atomic::{AtomicU32, Ordering::Relaxed},
16        task::{Context, Poll},
17        vec::Vec,
18    },
19};
20
21/// Operations that a stream requires throughout the implementation.
22///
23/// This is generated by `wit_bindgen::generate!` primarily.
24#[doc(hidden)]
25pub struct StreamVtable<T> {
26    /// The in-memory canonical ABI layout of a single value of `T`.
27    pub layout: Layout,
28
29    /// An optional callback where if provided will lower an owned `T` value
30    /// into the `dst` pointer.
31    ///
32    /// If this is called the ownership of all of `T`'s lists and resources are
33    /// passed to `dst`, possibly by reallocating if `T`'s layout differs from
34    /// the canonical ABI layout.
35    ///
36    /// If this is `None` then it means that `T` has the same layout in-memory
37    /// in Rust as it does in the canonical ABI. In such a situation the
38    /// lower/lift operation can be dropped.
39    pub lower: Option<unsafe fn(value: T, dst: *mut u8)>,
40
41    /// Callback used to deallocate any owned lists in `dst` after a value has
42    /// been successfully sent along a stream.
43    ///
44    /// `None` means that `T` has no lists internally.
45    pub dealloc_lists: Option<unsafe fn(dst: *mut u8)>,
46
47    /// Dual of `lower`, and like `lower` if this is missing then it means that
48    /// `T` has the same in-memory representation in Rust and the canonical ABI.
49    pub lift: Option<unsafe fn(dst: *mut u8) -> T>,
50
51    /// The raw `stream.write` intrinsic.
52    pub start_write: unsafe extern "C" fn(stream: u32, val: *const u8, amt: usize) -> u32,
53    /// The raw `stream.read` intrinsic.
54    pub start_read: unsafe extern "C" fn(stream: u32, val: *mut u8, amt: usize) -> u32,
55    /// The raw `stream.cancel-write` intrinsic.
56    pub cancel_write: unsafe extern "C" fn(stream: u32) -> u32,
57    /// The raw `stream.cancel-read` intrinsic.
58    pub cancel_read: unsafe extern "C" fn(stream: u32) -> u32,
59    /// The raw `stream.drop-writable` intrinsic.
60    pub drop_writable: unsafe extern "C" fn(stream: u32),
61    /// The raw `stream.drop-readable` intrinsic.
62    pub drop_readable: unsafe extern "C" fn(stream: u32),
63    /// The raw `stream.new` intrinsic.
64    pub new: unsafe extern "C" fn() -> u64,
65}
66
67/// Helper function to create a new read/write pair for a component model
68/// stream.
69pub unsafe fn stream_new<T>(
70    vtable: &'static StreamVtable<T>,
71) -> (StreamWriter<T>, StreamReader<T>) {
72    unsafe {
73        let handles = (vtable.new)();
74        let reader = handles as u32;
75        let writer = (handles >> 32) as u32;
76        rtdebug!("stream.new() = [{writer}, {reader}]");
77        (
78            StreamWriter::new(writer, vtable),
79            StreamReader::new(reader, vtable),
80        )
81    }
82}
83
84/// Represents the writable end of a Component Model `stream`.
85pub struct StreamWriter<T: 'static> {
86    handle: u32,
87    vtable: &'static StreamVtable<T>,
88    done: bool,
89}
90
91impl<T> StreamWriter<T> {
92    #[doc(hidden)]
93    pub unsafe fn new(handle: u32, vtable: &'static StreamVtable<T>) -> Self {
94        Self {
95            handle,
96            vtable,
97            done: false,
98        }
99    }
100
101    /// Initiate a write of the `values` provided into this stream.
102    ///
103    /// This method is akin to an `async fn` except that the returned
104    /// [`StreamWrite`] future can also be cancelled via [`StreamWrite::cancel`]
105    /// to re-acquire undelivered values.
106    ///
107    /// This method will perform at most a single write of the `values`
108    /// provided. The returned future will resolve once the write has completed.
109    ///
110    /// # Return Values
111    ///
112    /// The returned [`StreamWrite`] future returns a tuple of `(result, buf)`.
113    /// The `result` can be `StreamResult::Complete(n)` meaning that `n` values
114    /// were sent from `values` into this writer. A result of
115    /// `StreamResult::Dropped` means that no values were sent and the other side
116    /// has hung-up and sending values will no longer be possible.
117    ///
118    /// The `buf` returned is an [`AbiBuffer<T>`] which retains ownership of the
119    /// original `values` provided here. That can be used to re-acquire `values`
120    /// through the [`AbiBuffer::into_vec`] method. The `buf` maintains an
121    /// internal cursor of how many values have been written and if the write
122    /// should be resumed to write the entire buffer then the
123    /// [`StreamWriter::write_buf`] method can be used to resume writing at the
124    /// next value in the buffer.
125    ///
126    /// # Cancellation
127    ///
128    /// The returned [`StreamWrite`] future can be cancelled like any other Rust
129    /// future via `drop`, but this means that `values` will be lost within the
130    /// future. The [`StreamWrite::cancel`] method can be used to re-acquire the
131    /// in-progress write that is being done with `values`. This is effectively
132    /// a way of forcing the future to immediately resolve.
133    ///
134    /// Note that if this future is cancelled via `drop` it does not mean that
135    /// no values were sent. It may be possible that values were still sent
136    /// despite being cancelled. Cancelling a write and determining what
137    /// happened must be done with [`StreamWrite::cancel`].
138    pub fn write(&mut self, values: Vec<T>) -> StreamWrite<'_, T> {
139        self.write_buf(AbiBuffer::new(values, self.vtable))
140    }
141
142    /// Same as [`StreamWriter::write`], except this takes [`AbiBuffer<T>`]
143    /// instead of `Vec<T>`.
144    pub fn write_buf(&mut self, values: AbiBuffer<T>) -> StreamWrite<'_, T> {
145        StreamWrite {
146            op: WaitableOperation::new(StreamWriteOp(marker::PhantomData), (self, values)),
147        }
148    }
149
150    /// Writes all of the `values` provided into this stream.
151    ///
152    /// This is a higher-level method than [`StreamWriter::write`] and does not
153    /// expose cancellation for example. This will successively attempt to write
154    /// all of `values` provided into this stream. Upon completion the same
155    /// vector will be returned and any remaining elements in the vector were
156    /// not sent because the stream was dropped.
157    pub async fn write_all(&mut self, values: Vec<T>) -> Vec<T> {
158        // Perform an initial write which converts `values` into `AbiBuffer`.
159        let (mut status, mut buf) = self.write(values).await;
160
161        // While the previous write completed and there's still remaining items
162        // in the buffer, perform another write.
163        while let StreamResult::Complete(_) = status {
164            if buf.remaining() == 0 {
165                break;
166            }
167            (status, buf) = self.write_buf(buf).await;
168
169            // FIXME(WebAssembly/component-model#490)
170            if status == StreamResult::Cancelled {
171                status = StreamResult::Complete(0);
172            }
173        }
174
175        // Return back any values that weren't written by shifting them to the
176        // front of the returned vector.
177        assert!(buf.remaining() == 0 || matches!(status, StreamResult::Dropped));
178        buf.into_vec()
179    }
180
181    /// Writes the singular `value` provided
182    ///
183    /// This is a higher-level method than [`StreamWriter::write`] and does not
184    /// expose cancellation for example. This will attempt to send `value` on
185    /// this stream.
186    ///
187    /// If the other end hangs up then the value is returned back as
188    /// `Some(value)`, otherwise `None` is returned indicating the value was
189    /// sent.
190    pub async fn write_one(&mut self, value: T) -> Option<T> {
191        // TODO: can probably be a bit more efficient about this and avoid
192        // moving `value` onto the heap in some situations, but that's left as
193        // an optimization for later.
194        self.write_all(std::vec![value]).await.pop()
195    }
196}
197
198impl<T> fmt::Debug for StreamWriter<T> {
199    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
200        f.debug_struct("StreamWriter")
201            .field("handle", &self.handle)
202            .finish()
203    }
204}
205
206impl<T> Drop for StreamWriter<T> {
207    fn drop(&mut self) {
208        rtdebug!("stream.drop-writable({})", self.handle);
209        unsafe {
210            (self.vtable.drop_writable)(self.handle);
211        }
212    }
213}
214
215/// Represents a write operation which may be cancelled prior to completion.
216pub struct StreamWrite<'a, T: 'static> {
217    op: WaitableOperation<StreamWriteOp<'a, T>>,
218}
219
220struct StreamWriteOp<'a, T: 'static>(marker::PhantomData<(&'a mut StreamWriter<T>, T)>);
221
222/// Result of a [`StreamWriter::write`] or [`StreamReader::read`] operation,
223/// yielded by the [`StreamWrite`] or [`StreamRead`] futures.
224#[derive(Copy, Clone, PartialEq, Eq, Debug)]
225pub enum StreamResult {
226    /// The provided number of values were successfully transferred.
227    ///
228    /// For writes this is how many items were written, and for reads this is
229    /// how many items were read.
230    Complete(usize),
231    /// No values were written, the other end has dropped its handle.
232    Dropped,
233    /// No values were written, the operation was cancelled.
234    Cancelled,
235}
236
237unsafe impl<'a, T> WaitableOp for StreamWriteOp<'a, T>
238where
239    T: 'static,
240{
241    type Start = (&'a mut StreamWriter<T>, AbiBuffer<T>);
242    type InProgress = (&'a mut StreamWriter<T>, AbiBuffer<T>);
243    type Result = (StreamResult, AbiBuffer<T>);
244    type Cancel = (StreamResult, AbiBuffer<T>);
245
246    fn start(&mut self, (writer, buf): Self::Start) -> (u32, Self::InProgress) {
247        if writer.done {
248            return (DROPPED, (writer, buf));
249        }
250
251        let (ptr, len) = buf.abi_ptr_and_len();
252        // SAFETY: sure hope this is safe, everything in this module and
253        // `AbiBuffer` is trying to make this safe.
254        let code = unsafe { (writer.vtable.start_write)(writer.handle, ptr, len) };
255        rtdebug!(
256            "stream.write({}, {ptr:?}, {len}) = {code:#x}",
257            writer.handle
258        );
259        (code, (writer, buf))
260    }
261
262    fn start_cancelled(&mut self, (_writer, buf): Self::Start) -> Self::Cancel {
263        (StreamResult::Cancelled, buf)
264    }
265
266    fn in_progress_update(
267        &mut self,
268        (writer, mut buf): Self::InProgress,
269        code: u32,
270    ) -> Result<Self::Result, Self::InProgress> {
271        match ReturnCode::decode(code) {
272            ReturnCode::Blocked => Err((writer, buf)),
273            ReturnCode::Dropped(0) => Ok((StreamResult::Dropped, buf)),
274            ReturnCode::Cancelled(0) => Ok((StreamResult::Cancelled, buf)),
275            code @ (ReturnCode::Completed(amt)
276            | ReturnCode::Dropped(amt)
277            | ReturnCode::Cancelled(amt)) => {
278                let amt = amt.try_into().unwrap();
279                buf.advance(amt);
280                if let ReturnCode::Dropped(_) = code {
281                    writer.done = true;
282                }
283                Ok((StreamResult::Complete(amt), buf))
284            }
285        }
286    }
287
288    fn in_progress_waitable(&mut self, (writer, _): &Self::InProgress) -> u32 {
289        writer.handle
290    }
291
292    fn in_progress_cancel(&mut self, (writer, _): &mut Self::InProgress) -> u32 {
293        // SAFETY: we're managing `writer` and all the various operational bits,
294        // so this relies on `WaitableOperation` being safe.
295        let code = unsafe { (writer.vtable.cancel_write)(writer.handle) };
296        rtdebug!("stream.cancel-write({}) = {code:#x}", writer.handle);
297        code
298    }
299
300    fn result_into_cancel(&mut self, result: Self::Result) -> Self::Cancel {
301        result
302    }
303}
304
305impl<T: 'static> Future for StreamWrite<'_, T> {
306    type Output = (StreamResult, AbiBuffer<T>);
307
308    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
309        self.pin_project().poll_complete(cx)
310    }
311}
312
313impl<'a, T: 'static> StreamWrite<'a, T> {
314    fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation<StreamWriteOp<'a, T>>> {
315        // SAFETY: we've chosen that when `Self` is pinned that it translates to
316        // always pinning the inner field, so that's codified here.
317        unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().op) }
318    }
319
320    /// Cancel this write if it hasn't already completed.
321    ///
322    /// This method can be used to cancel a write-in-progress and re-acquire
323    /// values being sent. Note that the result here may still indicate that
324    /// some values were written if the race to cancel the write was lost.
325    ///
326    /// # Panics
327    ///
328    /// Panics if the operation has already been completed via `Future::poll`,
329    /// or if this method is called twice.
330    pub fn cancel(self: Pin<&mut Self>) -> (StreamResult, AbiBuffer<T>) {
331        self.pin_project().cancel()
332    }
333}
334
335/// Represents the readable end of a Component Model `stream`.
336pub struct StreamReader<T: 'static> {
337    handle: AtomicU32,
338    vtable: &'static StreamVtable<T>,
339    done: bool,
340}
341
342impl<T> fmt::Debug for StreamReader<T> {
343    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
344        f.debug_struct("StreamReader")
345            .field("handle", &self.handle)
346            .finish()
347    }
348}
349
350impl<T> StreamReader<T> {
351    #[doc(hidden)]
352    pub fn new(handle: u32, vtable: &'static StreamVtable<T>) -> Self {
353        Self {
354            handle: AtomicU32::new(handle),
355            vtable,
356            done: false,
357        }
358    }
359
360    #[doc(hidden)]
361    pub fn take_handle(&self) -> u32 {
362        let ret = self.opt_handle().unwrap();
363        self.handle.store(u32::MAX, Relaxed);
364        ret
365    }
366
367    fn handle(&self) -> u32 {
368        self.opt_handle().unwrap()
369    }
370
371    fn opt_handle(&self) -> Option<u32> {
372        match self.handle.load(Relaxed) {
373            u32::MAX => None,
374            other => Some(other),
375        }
376    }
377
378    /// Starts a new read operation on this stream into `buf`.
379    ///
380    /// This method will read values into the spare capacity of the `buf`
381    /// provided. If `buf` has no spare capacity then this will be equivalent
382    /// to a zero-length read.
383    ///
384    /// Upon completion the `buf` will be yielded back to the caller via the
385    /// completion of the [`StreamRead`] future.
386    ///
387    /// # Cancellation
388    ///
389    /// Cancelling the returned future can be done with `drop` like all Rust
390    /// futures, but it does not mean that no values were read. To accurately
391    /// determine if values were read the [`StreamRead::cancel`] method must be
392    /// used.
393    pub fn read(&mut self, buf: Vec<T>) -> StreamRead<'_, T> {
394        StreamRead {
395            op: WaitableOperation::new(StreamReadOp(marker::PhantomData), (self, buf)),
396        }
397    }
398
399    /// Reads a single item from this stream.
400    ///
401    /// This is a higher-level method than [`StreamReader::read`] in that it
402    /// reads only a single item and does not expose control over cancellation.
403    pub async fn next(&mut self) -> Option<T> {
404        // TODO: should amortize this allocation and avoid doing it every time.
405        // Or somehow perhaps make this more optimal.
406        let (_result, mut buf) = self.read(Vec::with_capacity(1)).await;
407        buf.pop()
408    }
409
410    /// Reads all items from this stream and returns the list.
411    ///
412    /// This method will read all remaining items from this stream into a list
413    /// and await the stream to be dropped.
414    pub async fn collect(mut self) -> Vec<T> {
415        let mut ret = Vec::new();
416        loop {
417            // If there's no more spare capacity then reserve room for one item
418            // which should trigger `Vec`'s built-in resizing logic, which will
419            // free up likely more capacity than just one slot.
420            if ret.len() == ret.capacity() {
421                ret.reserve(1);
422            }
423            let (status, buf) = self.read(ret).await;
424            ret = buf;
425            match status {
426                StreamResult::Complete(_) => {}
427                StreamResult::Dropped => break,
428                StreamResult::Cancelled => unreachable!(),
429            }
430        }
431        ret
432    }
433}
434
435impl<T> Drop for StreamReader<T> {
436    fn drop(&mut self) {
437        let Some(handle) = self.opt_handle() else {
438            return;
439        };
440        unsafe {
441            rtdebug!("stream.drop-readable({})", handle);
442            (self.vtable.drop_readable)(handle);
443        }
444    }
445}
446
447/// Represents a read operation which may be cancelled prior to completion.
448pub struct StreamRead<'a, T: 'static> {
449    op: WaitableOperation<StreamReadOp<'a, T>>,
450}
451
452struct StreamReadOp<'a, T: 'static>(marker::PhantomData<(&'a mut StreamReader<T>, T)>);
453
454unsafe impl<'a, T> WaitableOp for StreamReadOp<'a, T>
455where
456    T: 'static,
457{
458    type Start = (&'a mut StreamReader<T>, Vec<T>);
459    type InProgress = (&'a mut StreamReader<T>, Vec<T>, Option<Cleanup>);
460    type Result = (StreamResult, Vec<T>);
461    type Cancel = (StreamResult, Vec<T>);
462
463    fn start(&mut self, (reader, mut buf): Self::Start) -> (u32, Self::InProgress) {
464        if reader.done {
465            return (DROPPED, (reader, buf, None));
466        }
467
468        let cap = buf.spare_capacity_mut();
469        let ptr;
470        let cleanup;
471        // If `T` requires a lifting operation, then allocate a slab of memory
472        // which will store the canonical ABI read. Otherwise we can use the
473        // raw capacity in `buf` itself.
474        if reader.vtable.lift.is_some() {
475            let layout = Layout::from_size_align(
476                reader.vtable.layout.size() * cap.len(),
477                reader.vtable.layout.align(),
478            )
479            .unwrap();
480            (ptr, cleanup) = Cleanup::new(layout);
481        } else {
482            ptr = cap.as_mut_ptr().cast();
483            cleanup = None;
484        }
485        // SAFETY: `ptr` is either in `buf` or in `cleanup`, both of which will
486        // persist with this async operation itself.
487        let code = unsafe { (reader.vtable.start_read)(reader.handle(), ptr, cap.len()) };
488        rtdebug!(
489            "stream.read({}, {ptr:?}, {}) = {code:#x}",
490            reader.handle(),
491            cap.len()
492        );
493        (code, (reader, buf, cleanup))
494    }
495
496    fn start_cancelled(&mut self, (_, buf): Self::Start) -> Self::Cancel {
497        (StreamResult::Cancelled, buf)
498    }
499
500    fn in_progress_update(
501        &mut self,
502        (reader, mut buf, cleanup): Self::InProgress,
503        code: u32,
504    ) -> Result<Self::Result, Self::InProgress> {
505        match ReturnCode::decode(code) {
506            ReturnCode::Blocked => Err((reader, buf, cleanup)),
507
508            // Note that the `cleanup`, if any, is discarded here.
509            ReturnCode::Dropped(0) => Ok((StreamResult::Dropped, buf)),
510
511            // When an in-progress read is successfully cancelled then the
512            // allocation that was being read into, if any, is just discarded.
513            //
514            // TODO: should maybe thread this around like `AbiBuffer` to cache
515            // the read allocation?
516            ReturnCode::Cancelled(0) => Ok((StreamResult::Cancelled, buf)),
517
518            code @ (ReturnCode::Completed(amt)
519            | ReturnCode::Dropped(amt)
520            | ReturnCode::Cancelled(amt)) => {
521                let amt = usize::try_from(amt).unwrap();
522                let cur_len = buf.len();
523                assert!(amt <= buf.capacity() - cur_len);
524
525                match reader.vtable.lift {
526                    // With a `lift` operation this now requires reading `amt` items
527                    // from `cleanup` and pushing them into `buf`.
528                    Some(lift) => {
529                        let mut ptr = cleanup
530                            .as_ref()
531                            .map(|c| c.ptr.as_ptr())
532                            .unwrap_or(ptr::null_mut());
533                        for _ in 0..amt {
534                            unsafe {
535                                buf.push(lift(ptr));
536                                ptr = ptr.add(reader.vtable.layout.size());
537                            }
538                        }
539                    }
540
541                    // If no `lift` was necessary, then the results of this operation
542                    // were read directly into `buf`, so just update its length now that
543                    // values have been initialized.
544                    None => unsafe { buf.set_len(cur_len + amt) },
545                }
546
547                // Intentionally dispose of `cleanup` here as, if it was used, all
548                // allocations have been read from it and appended to `buf`.
549                drop(cleanup);
550                if let ReturnCode::Dropped(_) = code {
551                    reader.done = true;
552                }
553                Ok((StreamResult::Complete(amt), buf))
554            }
555        }
556    }
557
558    fn in_progress_waitable(&mut self, (reader, ..): &Self::InProgress) -> u32 {
559        reader.handle()
560    }
561
562    fn in_progress_cancel(&mut self, (reader, ..): &mut Self::InProgress) -> u32 {
563        // SAFETY: we're managing `reader` and all the various operational bits,
564        // so this relies on `WaitableOperation` being safe.
565        let code = unsafe { (reader.vtable.cancel_read)(reader.handle()) };
566        rtdebug!("stream.cancel-read({}) = {code:#x}", reader.handle());
567        code
568    }
569
570    fn result_into_cancel(&mut self, result: Self::Result) -> Self::Cancel {
571        result
572    }
573}
574
575impl<T: 'static> Future for StreamRead<'_, T> {
576    type Output = (StreamResult, Vec<T>);
577
578    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
579        self.pin_project().poll_complete(cx)
580    }
581}
582
583impl<'a, T> StreamRead<'a, T> {
584    fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation<StreamReadOp<'a, T>>> {
585        // SAFETY: we've chosen that when `Self` is pinned that it translates to
586        // always pinning the inner field, so that's codified here.
587        unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().op) }
588    }
589
590    /// Cancel this read if it hasn't already completed.
591    ///
592    /// This method will initiate a cancellation operation for this active
593    /// read. This may race with the actual read itself and so this may actually
594    /// complete with some results.
595    ///
596    /// The final result of cancellation is returned, along with the original
597    /// buffer.
598    ///
599    /// # Panics
600    ///
601    /// Panics if the operation has already been completed via `Future::poll`,
602    /// or if this method is called twice.
603    pub fn cancel(self: Pin<&mut Self>) -> (StreamResult, Vec<T>) {
604        self.pin_project().cancel()
605    }
606}