wit_bindgen/rt/async_support/stream_support.rs
1//! For a high-level overview of how this module is implemented see the
2//! module documentation in `future_support.rs`.
3
4use crate::rt::async_support::waitable::{WaitableOp, WaitableOperation};
5use crate::rt::async_support::{AbiBuffer, ReturnCode, DROPPED};
6use {
7 crate::rt::Cleanup,
8 std::{
9 alloc::Layout,
10 fmt,
11 future::Future,
12 marker,
13 pin::Pin,
14 ptr,
15 sync::atomic::{AtomicU32, Ordering::Relaxed},
16 task::{Context, Poll},
17 vec::Vec,
18 },
19};
20
21/// Operations that a stream requires throughout the implementation.
22///
23/// This is generated by `wit_bindgen::generate!` primarily.
24#[doc(hidden)]
25pub struct StreamVtable<T> {
26 /// The in-memory canonical ABI layout of a single value of `T`.
27 pub layout: Layout,
28
29 /// An optional callback where if provided will lower an owned `T` value
30 /// into the `dst` pointer.
31 ///
32 /// If this is called the ownership of all of `T`'s lists and resources are
33 /// passed to `dst`, possibly by reallocating if `T`'s layout differs from
34 /// the canonical ABI layout.
35 ///
36 /// If this is `None` then it means that `T` has the same layout in-memory
37 /// in Rust as it does in the canonical ABI. In such a situation the
38 /// lower/lift operation can be dropped.
39 pub lower: Option<unsafe fn(value: T, dst: *mut u8)>,
40
41 /// Callback used to deallocate any owned lists in `dst` after a value has
42 /// been successfully sent along a stream.
43 ///
44 /// `None` means that `T` has no lists internally.
45 pub dealloc_lists: Option<unsafe fn(dst: *mut u8)>,
46
47 /// Dual of `lower`, and like `lower` if this is missing then it means that
48 /// `T` has the same in-memory representation in Rust and the canonical ABI.
49 pub lift: Option<unsafe fn(dst: *mut u8) -> T>,
50
51 /// The raw `stream.write` intrinsic.
52 pub start_write: unsafe extern "C" fn(stream: u32, val: *const u8, amt: usize) -> u32,
53 /// The raw `stream.read` intrinsic.
54 pub start_read: unsafe extern "C" fn(stream: u32, val: *mut u8, amt: usize) -> u32,
55 /// The raw `stream.cancel-write` intrinsic.
56 pub cancel_write: unsafe extern "C" fn(stream: u32) -> u32,
57 /// The raw `stream.cancel-read` intrinsic.
58 pub cancel_read: unsafe extern "C" fn(stream: u32) -> u32,
59 /// The raw `stream.drop-writable` intrinsic.
60 pub drop_writable: unsafe extern "C" fn(stream: u32),
61 /// The raw `stream.drop-readable` intrinsic.
62 pub drop_readable: unsafe extern "C" fn(stream: u32),
63 /// The raw `stream.new` intrinsic.
64 pub new: unsafe extern "C" fn() -> u64,
65}
66
67/// Helper function to create a new read/write pair for a component model
68/// stream.
69pub unsafe fn stream_new<T>(
70 vtable: &'static StreamVtable<T>,
71) -> (StreamWriter<T>, StreamReader<T>) {
72 unsafe {
73 let handles = (vtable.new)();
74 let reader = handles as u32;
75 let writer = (handles >> 32) as u32;
76 rtdebug!("stream.new() = [{writer}, {reader}]");
77 (
78 StreamWriter::new(writer, vtable),
79 StreamReader::new(reader, vtable),
80 )
81 }
82}
83
84/// Represents the writable end of a Component Model `stream`.
85pub struct StreamWriter<T: 'static> {
86 handle: u32,
87 vtable: &'static StreamVtable<T>,
88 done: bool,
89}
90
91impl<T> StreamWriter<T> {
92 #[doc(hidden)]
93 pub unsafe fn new(handle: u32, vtable: &'static StreamVtable<T>) -> Self {
94 Self {
95 handle,
96 vtable,
97 done: false,
98 }
99 }
100
101 /// Initiate a write of the `values` provided into this stream.
102 ///
103 /// This method is akin to an `async fn` except that the returned
104 /// [`StreamWrite`] future can also be cancelled via [`StreamWrite::cancel`]
105 /// to re-acquire undelivered values.
106 ///
107 /// This method will perform at most a single write of the `values`
108 /// provided. The returned future will resolve once the write has completed.
109 ///
110 /// # Return Values
111 ///
112 /// The returned [`StreamWrite`] future returns a tuple of `(result, buf)`.
113 /// The `result` can be `StreamResult::Complete(n)` meaning that `n` values
114 /// were sent from `values` into this writer. A result of
115 /// `StreamResult::Dropped` means that no values were sent and the other side
116 /// has hung-up and sending values will no longer be possible.
117 ///
118 /// The `buf` returned is an [`AbiBuffer<T>`] which retains ownership of the
119 /// original `values` provided here. That can be used to re-acquire `values`
120 /// through the [`AbiBuffer::into_vec`] method. The `buf` maintains an
121 /// internal cursor of how many values have been written and if the write
122 /// should be resumed to write the entire buffer then the
123 /// [`StreamWriter::write_buf`] method can be used to resume writing at the
124 /// next value in the buffer.
125 ///
126 /// # Cancellation
127 ///
128 /// The returned [`StreamWrite`] future can be cancelled like any other Rust
129 /// future via `drop`, but this means that `values` will be lost within the
130 /// future. The [`StreamWrite::cancel`] method can be used to re-acquire the
131 /// in-progress write that is being done with `values`. This is effectively
132 /// a way of forcing the future to immediately resolve.
133 ///
134 /// Note that if this future is cancelled via `drop` it does not mean that
135 /// no values were sent. It may be possible that values were still sent
136 /// despite being cancelled. Cancelling a write and determining what
137 /// happened must be done with [`StreamWrite::cancel`].
138 pub fn write(&mut self, values: Vec<T>) -> StreamWrite<'_, T> {
139 self.write_buf(AbiBuffer::new(values, self.vtable))
140 }
141
142 /// Same as [`StreamWriter::write`], except this takes [`AbiBuffer<T>`]
143 /// instead of `Vec<T>`.
144 pub fn write_buf(&mut self, values: AbiBuffer<T>) -> StreamWrite<'_, T> {
145 StreamWrite {
146 op: WaitableOperation::new((self, values)),
147 }
148 }
149
150 /// Writes all of the `values` provided into this stream.
151 ///
152 /// This is a higher-level method than [`StreamWriter::write`] and does not
153 /// expose cancellation for example. This will successively attempt to write
154 /// all of `values` provided into this stream. Upon completion the same
155 /// vector will be returned and any remaining elements in the vector were
156 /// not sent because the stream was dropped.
157 pub async fn write_all(&mut self, values: Vec<T>) -> Vec<T> {
158 // Perform an initial write which converts `values` into `AbiBuffer`.
159 let (mut status, mut buf) = self.write(values).await;
160
161 // While the previous write completed and there's still remaining items
162 // in the buffer, perform another write.
163 while let StreamResult::Complete(_) = status {
164 if buf.remaining() == 0 {
165 break;
166 }
167 (status, buf) = self.write_buf(buf).await;
168
169 // FIXME(WebAssembly/component-model#490)
170 if status == StreamResult::Cancelled {
171 status = StreamResult::Complete(0);
172 }
173 }
174
175 // Return back any values that weren't written by shifting them to the
176 // front of the returned vector.
177 assert!(buf.remaining() == 0 || matches!(status, StreamResult::Dropped));
178 buf.into_vec()
179 }
180
181 /// Writes the singular `value` provided
182 ///
183 /// This is a higher-level method than [`StreamWriter::write`] and does not
184 /// expose cancellation for example. This will attempt to send `value` on
185 /// this stream.
186 ///
187 /// If the other end hangs up then the value is returned back as
188 /// `Some(value)`, otherwise `None` is returned indicating the value was
189 /// sent.
190 pub async fn write_one(&mut self, value: T) -> Option<T> {
191 // TODO: can probably be a bit more efficient about this and avoid
192 // moving `value` onto the heap in some situations, but that's left as
193 // an optimization for later.
194 self.write_all(std::vec![value]).await.pop()
195 }
196}
197
198impl<T> fmt::Debug for StreamWriter<T> {
199 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
200 f.debug_struct("StreamWriter")
201 .field("handle", &self.handle)
202 .finish()
203 }
204}
205
206impl<T> Drop for StreamWriter<T> {
207 fn drop(&mut self) {
208 rtdebug!("stream.drop-writable({})", self.handle);
209 unsafe {
210 (self.vtable.drop_writable)(self.handle);
211 }
212 }
213}
214
215/// Represents a write operation which may be cancelled prior to completion.
216pub struct StreamWrite<'a, T: 'static> {
217 op: WaitableOperation<StreamWriteOp<'a, T>>,
218}
219
220struct StreamWriteOp<'a, T: 'static>(marker::PhantomData<(&'a mut StreamWriter<T>, T)>);
221
222/// Result of a [`StreamWriter::write`] or [`StreamReader::read`] operation,
223/// yielded by the [`StreamWrite`] or [`StreamRead`] futures.
224#[derive(Copy, Clone, PartialEq, Eq, Debug)]
225pub enum StreamResult {
226 /// The provided number of values were successfully transferred.
227 ///
228 /// For writes this is how many items were written, and for reads this is
229 /// how many items were read.
230 Complete(usize),
231 /// No values were written, the other end has dropped its handle.
232 Dropped,
233 /// No values were written, the operation was cancelled.
234 Cancelled,
235}
236
237unsafe impl<'a, T> WaitableOp for StreamWriteOp<'a, T>
238where
239 T: 'static,
240{
241 type Start = (&'a mut StreamWriter<T>, AbiBuffer<T>);
242 type InProgress = (&'a mut StreamWriter<T>, AbiBuffer<T>);
243 type Result = (StreamResult, AbiBuffer<T>);
244 type Cancel = (StreamResult, AbiBuffer<T>);
245
246 fn start((writer, buf): Self::Start) -> (u32, Self::InProgress) {
247 if writer.done {
248 return (DROPPED, (writer, buf));
249 }
250
251 let (ptr, len) = buf.abi_ptr_and_len();
252 // SAFETY: sure hope this is safe, everything in this module and
253 // `AbiBuffer` is trying to make this safe.
254 let code = unsafe { (writer.vtable.start_write)(writer.handle, ptr, len) };
255 rtdebug!(
256 "stream.write({}, {ptr:?}, {len}) = {code:#x}",
257 writer.handle
258 );
259 (code, (writer, buf))
260 }
261
262 fn start_cancelled((_writer, buf): Self::Start) -> Self::Cancel {
263 (StreamResult::Cancelled, buf)
264 }
265
266 fn in_progress_update(
267 (writer, mut buf): Self::InProgress,
268 code: u32,
269 ) -> Result<Self::Result, Self::InProgress> {
270 match ReturnCode::decode(code) {
271 ReturnCode::Blocked => Err((writer, buf)),
272 ReturnCode::Dropped(0) => Ok((StreamResult::Dropped, buf)),
273 ReturnCode::Cancelled(0) => Ok((StreamResult::Cancelled, buf)),
274 code @ (ReturnCode::Completed(amt)
275 | ReturnCode::Dropped(amt)
276 | ReturnCode::Cancelled(amt)) => {
277 let amt = amt.try_into().unwrap();
278 buf.advance(amt);
279 if let ReturnCode::Dropped(_) = code {
280 writer.done = true;
281 }
282 Ok((StreamResult::Complete(amt), buf))
283 }
284 }
285 }
286
287 fn in_progress_waitable((writer, _): &Self::InProgress) -> u32 {
288 writer.handle
289 }
290
291 fn in_progress_cancel((writer, _): &Self::InProgress) -> u32 {
292 // SAFETY: we're managing `writer` and all the various operational bits,
293 // so this relies on `WaitableOperation` being safe.
294 let code = unsafe { (writer.vtable.cancel_write)(writer.handle) };
295 rtdebug!("stream.cancel-write({}) = {code:#x}", writer.handle);
296 code
297 }
298
299 fn result_into_cancel(result: Self::Result) -> Self::Cancel {
300 result
301 }
302}
303
304impl<T: 'static> Future for StreamWrite<'_, T> {
305 type Output = (StreamResult, AbiBuffer<T>);
306
307 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
308 self.pin_project().poll_complete(cx)
309 }
310}
311
312impl<'a, T: 'static> StreamWrite<'a, T> {
313 fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation<StreamWriteOp<'a, T>>> {
314 // SAFETY: we've chosen that when `Self` is pinned that it translates to
315 // always pinning the inner field, so that's codified here.
316 unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().op) }
317 }
318
319 /// Cancel this write if it hasn't already completed.
320 ///
321 /// This method can be used to cancel a write-in-progress and re-acquire
322 /// values being sent. Note that the result here may still indicate that
323 /// some values were written if the race to cancel the write was lost.
324 ///
325 /// # Panics
326 ///
327 /// Panics if the operation has already been completed via `Future::poll`,
328 /// or if this method is called twice.
329 pub fn cancel(self: Pin<&mut Self>) -> (StreamResult, AbiBuffer<T>) {
330 self.pin_project().cancel()
331 }
332}
333
334/// Represents the readable end of a Component Model `stream`.
335pub struct StreamReader<T: 'static> {
336 handle: AtomicU32,
337 vtable: &'static StreamVtable<T>,
338 done: bool,
339}
340
341impl<T> fmt::Debug for StreamReader<T> {
342 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
343 f.debug_struct("StreamReader")
344 .field("handle", &self.handle)
345 .finish()
346 }
347}
348
349impl<T> StreamReader<T> {
350 #[doc(hidden)]
351 pub fn new(handle: u32, vtable: &'static StreamVtable<T>) -> Self {
352 Self {
353 handle: AtomicU32::new(handle),
354 vtable,
355 done: false,
356 }
357 }
358
359 #[doc(hidden)]
360 pub fn take_handle(&self) -> u32 {
361 let ret = self.opt_handle().unwrap();
362 self.handle.store(u32::MAX, Relaxed);
363 ret
364 }
365
366 fn handle(&self) -> u32 {
367 self.opt_handle().unwrap()
368 }
369
370 fn opt_handle(&self) -> Option<u32> {
371 match self.handle.load(Relaxed) {
372 u32::MAX => None,
373 other => Some(other),
374 }
375 }
376
377 /// Starts a new read operation on this stream into `buf`.
378 ///
379 /// This method will read values into the spare capacity of the `buf`
380 /// provided. If `buf` has no spare capacity then this will be equivalent
381 /// to a zero-length read.
382 ///
383 /// Upon completion the `buf` will be yielded back to the caller via the
384 /// completion of the [`StreamRead`] future.
385 ///
386 /// # Cancellation
387 ///
388 /// Cancelling the returned future can be done with `drop` like all Rust
389 /// futures, but it does not mean that no values were read. To accurately
390 /// determine if values were read the [`StreamRead::cancel`] method must be
391 /// used.
392 pub fn read(&mut self, buf: Vec<T>) -> StreamRead<'_, T> {
393 StreamRead {
394 op: WaitableOperation::new((self, buf)),
395 }
396 }
397
398 /// Reads a single item from this stream.
399 ///
400 /// This is a higher-level method than [`StreamReader::read`] in that it
401 /// reads only a single item and does not expose control over cancellation.
402 pub async fn next(&mut self) -> Option<T> {
403 // TODO: should amortize this allocation and avoid doing it every time.
404 // Or somehow perhaps make this more optimal.
405 let (_result, mut buf) = self.read(Vec::with_capacity(1)).await;
406 buf.pop()
407 }
408
409 /// Reads all items from this stream and returns the list.
410 ///
411 /// This method will read all remaining items from this stream into a list
412 /// and await the stream to be dropped.
413 pub async fn collect(mut self) -> Vec<T> {
414 let mut ret = Vec::new();
415 loop {
416 // If there's no more spare capacity then reserve room for one item
417 // which should trigger `Vec`'s built-in resizing logic, which will
418 // free up likely more capacity than just one slot.
419 if ret.len() == ret.capacity() {
420 ret.reserve(1);
421 }
422 let (status, buf) = self.read(ret).await;
423 ret = buf;
424 match status {
425 StreamResult::Complete(_) => {}
426 StreamResult::Dropped => break,
427 StreamResult::Cancelled => unreachable!(),
428 }
429 }
430 ret
431 }
432}
433
434impl<T> Drop for StreamReader<T> {
435 fn drop(&mut self) {
436 let Some(handle) = self.opt_handle() else {
437 return;
438 };
439 unsafe {
440 rtdebug!("stream.drop-readable({})", handle);
441 (self.vtable.drop_readable)(handle);
442 }
443 }
444}
445
446/// Represents a read operation which may be cancelled prior to completion.
447pub struct StreamRead<'a, T: 'static> {
448 op: WaitableOperation<StreamReadOp<'a, T>>,
449}
450
451struct StreamReadOp<'a, T: 'static>(marker::PhantomData<(&'a mut StreamReader<T>, T)>);
452
453unsafe impl<'a, T> WaitableOp for StreamReadOp<'a, T>
454where
455 T: 'static,
456{
457 type Start = (&'a mut StreamReader<T>, Vec<T>);
458 type InProgress = (&'a mut StreamReader<T>, Vec<T>, Option<Cleanup>);
459 type Result = (StreamResult, Vec<T>);
460 type Cancel = (StreamResult, Vec<T>);
461
462 fn start((reader, mut buf): Self::Start) -> (u32, Self::InProgress) {
463 if reader.done {
464 return (DROPPED, (reader, buf, None));
465 }
466
467 let cap = buf.spare_capacity_mut();
468 let ptr;
469 let cleanup;
470 // If `T` requires a lifting operation, then allocate a slab of memory
471 // which will store the canonical ABI read. Otherwise we can use the
472 // raw capacity in `buf` itself.
473 if reader.vtable.lift.is_some() {
474 let layout = Layout::from_size_align(
475 reader.vtable.layout.size() * cap.len(),
476 reader.vtable.layout.align(),
477 )
478 .unwrap();
479 (ptr, cleanup) = Cleanup::new(layout);
480 } else {
481 ptr = cap.as_mut_ptr().cast();
482 cleanup = None;
483 }
484 // SAFETY: `ptr` is either in `buf` or in `cleanup`, both of which will
485 // persist with this async operation itself.
486 let code = unsafe { (reader.vtable.start_read)(reader.handle(), ptr, cap.len()) };
487 rtdebug!(
488 "stream.read({}, {ptr:?}, {}) = {code:#x}",
489 reader.handle(),
490 cap.len()
491 );
492 (code, (reader, buf, cleanup))
493 }
494
495 fn start_cancelled((_, buf): Self::Start) -> Self::Cancel {
496 (StreamResult::Cancelled, buf)
497 }
498
499 fn in_progress_update(
500 (reader, mut buf, cleanup): Self::InProgress,
501 code: u32,
502 ) -> Result<Self::Result, Self::InProgress> {
503 match ReturnCode::decode(code) {
504 ReturnCode::Blocked => Err((reader, buf, cleanup)),
505
506 // Note that the `cleanup`, if any, is discarded here.
507 ReturnCode::Dropped(0) => Ok((StreamResult::Dropped, buf)),
508
509 // When an in-progress read is successfully cancelled then the
510 // allocation that was being read into, if any, is just discarded.
511 //
512 // TODO: should maybe thread this around like `AbiBuffer` to cache
513 // the read allocation?
514 ReturnCode::Cancelled(0) => Ok((StreamResult::Cancelled, buf)),
515
516 code @ (ReturnCode::Completed(amt)
517 | ReturnCode::Dropped(amt)
518 | ReturnCode::Cancelled(amt)) => {
519 let amt = usize::try_from(amt).unwrap();
520 let cur_len = buf.len();
521 assert!(amt <= buf.capacity() - cur_len);
522
523 match reader.vtable.lift {
524 // With a `lift` operation this now requires reading `amt` items
525 // from `cleanup` and pushing them into `buf`.
526 Some(lift) => {
527 let mut ptr = cleanup
528 .as_ref()
529 .map(|c| c.ptr.as_ptr())
530 .unwrap_or(ptr::null_mut());
531 for _ in 0..amt {
532 unsafe {
533 buf.push(lift(ptr));
534 ptr = ptr.add(reader.vtable.layout.size());
535 }
536 }
537 }
538
539 // If no `lift` was necessary, then the results of this operation
540 // were read directly into `buf`, so just update its length now that
541 // values have been initialized.
542 None => unsafe { buf.set_len(cur_len + amt) },
543 }
544
545 // Intentionally dispose of `cleanup` here as, if it was used, all
546 // allocations have been read from it and appended to `buf`.
547 drop(cleanup);
548 if let ReturnCode::Dropped(_) = code {
549 reader.done = true;
550 }
551 Ok((StreamResult::Complete(amt), buf))
552 }
553 }
554 }
555
556 fn in_progress_waitable((reader, ..): &Self::InProgress) -> u32 {
557 reader.handle()
558 }
559
560 fn in_progress_cancel((reader, ..): &Self::InProgress) -> u32 {
561 // SAFETY: we're managing `reader` and all the various operational bits,
562 // so this relies on `WaitableOperation` being safe.
563 let code = unsafe { (reader.vtable.cancel_read)(reader.handle()) };
564 rtdebug!("stream.cancel-read({}) = {code:#x}", reader.handle());
565 code
566 }
567
568 fn result_into_cancel(result: Self::Result) -> Self::Cancel {
569 result
570 }
571}
572
573impl<T: 'static> Future for StreamRead<'_, T> {
574 type Output = (StreamResult, Vec<T>);
575
576 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
577 self.pin_project().poll_complete(cx)
578 }
579}
580
581impl<'a, T> StreamRead<'a, T> {
582 fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation<StreamReadOp<'a, T>>> {
583 // SAFETY: we've chosen that when `Self` is pinned that it translates to
584 // always pinning the inner field, so that's codified here.
585 unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().op) }
586 }
587
588 /// Cancel this read if it hasn't already completed.
589 ///
590 /// This method will initiate a cancellation operation for this active
591 /// read. This may race with the actual read itself and so this may actually
592 /// complete with some results.
593 ///
594 /// The final result of cancellation is returned, along with the original
595 /// buffer.
596 ///
597 /// # Panics
598 ///
599 /// Panics if the operation has already been completed via `Future::poll`,
600 /// or if this method is called twice.
601 pub fn cancel(self: Pin<&mut Self>) -> (StreamResult, Vec<T>) {
602 self.pin_project().cancel()
603 }
604}